From a7fb05676af5fe537273e3e8de260a70b91a6fd5 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 14 Sep 2022 11:40:22 -0400 Subject: [PATCH 01/19] Update to a single scheduled executor service to mitigate thread propogation --- .../extensions/gcp/options/GcsOptions.java | 18 +++++++++------ .../io/gcp/bigquery/BigQueryServicesImpl.java | 22 +++++++++++++++++++ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index d183d1647e112..ecc4eb18f1fa5 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -20,6 +20,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -147,13 +149,15 @@ public ExecutorService create(PipelineOptions options) { * them in forward order thus requiring enough threads so that each step's writers * can be active. */ - return new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. - Long.MAX_VALUE, - TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. - new SynchronousQueue<>(), - threadFactoryBuilder.build()); + + return Executors.newScheduledThreadPool(0, threadFactoryBuilder.build()); + // return new ThreadPoolExecutor( + // 0, + // Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + // Long.MAX_VALUE, + // TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + // new SynchronousQueue<>(), + // threadFactoryBuilder.build()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index be418e7cea94b..126a2081c996d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -29,6 +29,7 @@ import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; +import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.FixedHeaderProvider; @@ -105,6 +106,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1482,12 +1484,32 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option return BigQueryWriteClient.create( BigQueryWriteSettings.newBuilder() .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential()) + .setBackgroundExecutorProvider(new OptionsExecutionProvider(options)) .build()); } catch (Exception e) { throw new RuntimeException(e); } } + private static class OptionsExecutionProvider implements ExecutorProvider { + + private final BigQueryOptions options; + + public OptionsExecutionProvider(BigQueryOptions options){ + this.options = options; + } + + @Override + public boolean shouldAutoClose() { + return false; + } + + @Override + public ScheduledExecutorService getExecutor() { + return (ScheduledExecutorService) options.as(GcsOptions.class).getExecutorService(); + } + } + public static CustomHttpErrors createBigQueryClientCustomErrors() { CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder(); // 403 errors, to list tables, matching this URL: From 1e062e20e855bf36b4beddbc4b63fc82b66f00b1 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 14 Sep 2022 15:13:19 -0400 Subject: [PATCH 02/19] run spotless --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 126a2081c996d..117366474fd82 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1495,7 +1495,7 @@ private static class OptionsExecutionProvider implements ExecutorProvider { private final BigQueryOptions options; - public OptionsExecutionProvider(BigQueryOptions options){ + public OptionsExecutionProvider(BigQueryOptions options) { this.options = options; } From f2c95a9ac9afa8e2c9216f64491a4c042717d0dd Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 14 Sep 2022 16:15:15 -0400 Subject: [PATCH 03/19] run spotless --- .../apache/beam/sdk/extensions/gcp/options/GcsOptions.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index ecc4eb18f1fa5..dd878e8fd106a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -21,10 +21,6 @@ import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; From 5b81a102b9b59281629a610da0a6750271aecc15 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 15 Sep 2022 13:25:18 -0400 Subject: [PATCH 04/19] Provide separate scheduled executor --- .../extensions/gcp/options/GcsOptions.java | 59 ++++++++++++++++--- .../io/gcp/bigquery/BigQueryServicesImpl.java | 2 +- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index dd878e8fd106a..63e0265df0ade 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -21,6 +21,10 @@ import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; @@ -66,6 +70,25 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline void setExecutorService(ExecutorService value); + //TODO update docs + /** + * The ExecutorService instance to use to create threads, can be overridden to specify an + * ExecutorService that is compatible with the user's environment. If unset, the default is to + * create an ExecutorService with an unbounded number of threads; this is compatible with Google + * AppEngine. + */ + @JsonIgnore + @Description( + "The ExecutorService instance to use to create multiple threads. Can be overridden " + + "to specify an ExecutorService that is compatible with the user's environment. If unset, " + + "the default is to create an ExecutorService with an unbounded number of threads; this " + + "is compatible with Google AppEngine.") + @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) + @Hidden + ScheduledExecutorService getScheduledExecutorService(); + + void setScheduledExecutorService(ScheduledExecutorService value); + /** GCS endpoint to use. If unspecified, uses the default endpoint. */ @JsonIgnore @Hidden @@ -146,14 +169,36 @@ public ExecutorService create(PipelineOptions options) { * can be active. */ + return new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + Long.MAX_VALUE, + TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + new SynchronousQueue<>(), + threadFactoryBuilder.build()); + } + } + + /** + * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The {@link + * ExecutorService} is compatible with AppEngine. + */ + class ScheduledExecutorServiceFactory implements DefaultValueFactory { + @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. + @Override + public ScheduledExecutorService create(PipelineOptions options) { + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); + threadFactoryBuilder.setDaemon(true); + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return Executors.newScheduledThreadPool(0, threadFactoryBuilder.build()); - // return new ThreadPoolExecutor( - // 0, - // Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. - // Long.MAX_VALUE, - // TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. - // new SynchronousQueue<>(), - // threadFactoryBuilder.build()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 117366474fd82..585c4c3495dbb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1506,7 +1506,7 @@ public boolean shouldAutoClose() { @Override public ScheduledExecutorService getExecutor() { - return (ScheduledExecutorService) options.as(GcsOptions.class).getExecutorService(); + return options.as(GcsOptions.class).getScheduledExecutorService(); } } From 97fe2c2aae94369c0292198e28ceb03308c8de81 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 16 Sep 2022 14:29:06 -0400 Subject: [PATCH 05/19] try scheduled thread pool again --- .../sdk/extensions/gcp/options/GcsOptions.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 63e0265df0ade..e27bf8d7770b7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -70,7 +70,7 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline void setExecutorService(ExecutorService value); - //TODO update docs + // TODO update docs /** * The ExecutorService instance to use to create threads, can be overridden to specify an * ExecutorService that is compatible with the user's environment. If unset, the default is to @@ -168,14 +168,14 @@ public ExecutorService create(PipelineOptions options) { * them in forward order thus requiring enough threads so that each step's writers * can be active. */ - - return new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. - Long.MAX_VALUE, - TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. - new SynchronousQueue<>(), - threadFactoryBuilder.build()); + return Executors.newScheduledThreadPool(0, threadFactoryBuilder.build()); + // return new ThreadPoolExecutor( + // 0, + // Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + // Long.MAX_VALUE, + // TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + // new SynchronousQueue<>(), + // threadFactoryBuilder.build()); } } From e60685678b28073e67f4a36ba9d81bda6d264900 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 28 Sep 2022 14:38:18 -0400 Subject: [PATCH 06/19] Attempt to mimic Bigquery default scheduled thread pool --- .../sdk/extensions/gcp/options/GcsOptions.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index e27bf8d7770b7..834fb0b8dee87 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -168,14 +168,14 @@ public ExecutorService create(PipelineOptions options) { * them in forward order thus requiring enough threads so that each step's writers * can be active. */ - return Executors.newScheduledThreadPool(0, threadFactoryBuilder.build()); - // return new ThreadPoolExecutor( - // 0, - // Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. - // Long.MAX_VALUE, - // TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. - // new SynchronousQueue<>(), - // threadFactoryBuilder.build()); + // return Executors.newScheduledThreadPool(0, threadFactoryBuilder.build()); + return new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + Long.MAX_VALUE, + TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + new SynchronousQueue<>(), + threadFactoryBuilder.build()); } } @@ -198,7 +198,7 @@ public ScheduledExecutorService create(PipelineOptions options) { * can be active. */ - return Executors.newScheduledThreadPool(0, threadFactoryBuilder.build()); + return Executors.newScheduledThreadPool(Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); } } From 62b4d5d48b323e64bf7f19431ea2caf18f95367f Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 28 Sep 2022 16:08:06 -0400 Subject: [PATCH 07/19] run spotless --- .../org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 834fb0b8dee87..c77aec323182f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -198,7 +198,8 @@ public ScheduledExecutorService create(PipelineOptions options) { * can be active. */ - return Executors.newScheduledThreadPool(Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); + return Executors.newScheduledThreadPool( + Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); } } From 1de7feba56214e5d1664029a39ee9cf781cb8166 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 28 Sep 2022 16:11:56 -0400 Subject: [PATCH 08/19] update docs --- .../sdk/extensions/gcp/options/GcsOptions.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index c77aec323182f..e6c0e7e2dc308 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -70,19 +70,18 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline void setExecutorService(ExecutorService value); - // TODO update docs /** - * The ExecutorService instance to use to create threads, can be overridden to specify an - * ExecutorService that is compatible with the user's environment. If unset, the default is to - * create an ExecutorService with an unbounded number of threads; this is compatible with Google - * AppEngine. + * The ScheduledExecutorService instance to use to create threads, can be overridden to specify a + * ScheduledExecutorService that is compatible with the user's environment. If unset, the default + * is to create an ScheduledExecutorService with a core number of threads equal to Math.max(4, + * Runtime.getRuntime().availableProcessors()) */ @JsonIgnore @Description( - "The ExecutorService instance to use to create multiple threads. Can be overridden " - + "to specify an ExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create an ExecutorService with an unbounded number of threads; this " - + "is compatible with Google AppEngine.") + "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " + + "the default is to create an ScheduledExecutorService with a core number of threads " + + "equal to Math.max(4, Runtime.getRuntime().availableProcessors())") @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) @Hidden ScheduledExecutorService getScheduledExecutorService(); From 45fa683d0f8388eef5d20b6d59d22907e4e6844f Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 29 Sep 2022 13:28:20 -0400 Subject: [PATCH 09/19] Refactor Scheduled executor service to its own options file --- .../beam/sdk/options/ExecutorOptions.java | 56 +++++++++++ .../extensions/gcp/options/GcsOptions.java | 92 ++----------------- .../io/gcp/bigquery/BigQueryServicesImpl.java | 7 +- 3 files changed, 72 insertions(+), 83 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java new file mode 100644 index 0000000000000..eb45e569e25d3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -0,0 +1,56 @@ +package org.apache.beam.sdk.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; + +public interface ExecutorOptions extends PipelineOptions{ + + /** + * The ScheduledExecutorService instance to use to create threads, can be overridden to specify a + * ScheduledExecutorService that is compatible with the user's environment. If unset, the default + * is to create an ScheduledExecutorService with a core number of threads equal to Math.max(4, + * Runtime.getRuntime().availableProcessors()) + */ + @JsonIgnore + @Description( + "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " + + "the default is to create a ScheduledExecutorService with a core number of threads " + + "equal to Math.max(4, Runtime.getRuntime().availableProcessors())") + @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) + @Hidden + ScheduledExecutorService getScheduledExecutorService(); + + void setScheduledExecutorService(ScheduledExecutorService value); + + + /** + * Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. + */ + class ScheduledExecutorServiceFactory implements DefaultValueFactory { + @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. + @Override + public ScheduledExecutorService create(PipelineOptions options) { + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); + threadFactoryBuilder.setDaemon(true); + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + + /*The minimum of max(4, processors) was chosen as a default working configuration found in + * the Bigquery client library + */ + return Executors.newScheduledThreadPool( + Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); + } + } + +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index e6c0e7e2dc308..3d2ec566e0168 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -20,11 +20,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; @@ -34,15 +29,14 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.checkerframework.checker.nullness.qual.Nullable; /** Options used to configure Google Cloud Storage. */ -public interface GcsOptions extends ApplicationNameOptions, GcpOptions, PipelineOptions { +public interface GcsOptions extends ApplicationNameOptions, GcpOptions, ExecutorOptions, PipelineOptions { /** The GcsUtil instance that should be used to communicate with Google Cloud Storage. */ @JsonIgnore @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.") @@ -55,38 +49,25 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline /** * The ExecutorService instance to use to create threads, can be overridden to specify an * ExecutorService that is compatible with the user's environment. If unset, the default is to - * create an ExecutorService with an unbounded number of threads; this is compatible with Google - * AppEngine. + * create a ScheduledExecutorService with a core number of threads equal to + * Math.max(4, Runtime.getRuntime().availableProcessors()). Deprecated in favor of + * getScheduledExecutorService instead */ @JsonIgnore @Description( "The ExecutorService instance to use to create multiple threads. Can be overridden " + "to specify an ExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create an ExecutorService with an unbounded number of threads; this " - + "is compatible with Google AppEngine.") - @Default.InstanceFactory(ExecutorServiceFactory.class) + + "the default is to create a ScheduledExecutorService with a core number of threads" + + "equal to Math.max(4, Runtime.getRuntime().availableProcessors()).") + @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) @Hidden + @Deprecated ExecutorService getExecutorService(); + @Deprecated void setExecutorService(ExecutorService value); - /** - * The ScheduledExecutorService instance to use to create threads, can be overridden to specify a - * ScheduledExecutorService that is compatible with the user's environment. If unset, the default - * is to create an ScheduledExecutorService with a core number of threads equal to Math.max(4, - * Runtime.getRuntime().availableProcessors()) - */ - @JsonIgnore - @Description( - "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " - + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create an ScheduledExecutorService with a core number of threads " - + "equal to Math.max(4, Runtime.getRuntime().availableProcessors())") - @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) - @Hidden - ScheduledExecutorService getScheduledExecutorService(); - void setScheduledExecutorService(ScheduledExecutorService value); /** GCS endpoint to use. If unspecified, uses the default endpoint. */ @JsonIgnore @@ -149,59 +130,6 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline void setGcsPerformanceMetrics(Boolean reportPerformanceMetrics); - /** - * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The {@link - * ExecutorService} is compatible with AppEngine. - */ - class ExecutorServiceFactory implements DefaultValueFactory { - @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. - @Override - public ExecutorService create(PipelineOptions options) { - ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); - threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); - threadFactoryBuilder.setDaemon(true); - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - // return Executors.newScheduledThreadPool(0, threadFactoryBuilder.build()); - return new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. - Long.MAX_VALUE, - TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. - new SynchronousQueue<>(), - threadFactoryBuilder.build()); - } - } - - /** - * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The {@link - * ExecutorService} is compatible with AppEngine. - */ - class ScheduledExecutorServiceFactory implements DefaultValueFactory { - @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. - @Override - public ScheduledExecutorService create(PipelineOptions options) { - ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); - threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); - threadFactoryBuilder.setDaemon(true); - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - - return Executors.newScheduledThreadPool( - Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); - } - } - /** * Creates a {@link PathValidator} object using the class specified in {@link * #getPathValidatorClass()}. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 585c4c3495dbb..86338d6889861 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -122,6 +122,7 @@ import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.FluentBackoff; @@ -1491,6 +1492,10 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option } } + /** + * OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide + * {@link ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient} + */ private static class OptionsExecutionProvider implements ExecutorProvider { private final BigQueryOptions options; @@ -1506,7 +1511,7 @@ public boolean shouldAutoClose() { @Override public ScheduledExecutorService getExecutor() { - return options.as(GcsOptions.class).getScheduledExecutorService(); + return options.as(ExecutorOptions.class).getScheduledExecutorService(); } } From d68b2a17b15d1b5b9883865c28a53d97fb9293bc Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 29 Sep 2022 17:40:47 -0400 Subject: [PATCH 10/19] clean up comments, don't accidentally create two scheduled executor services --- .../beam/sdk/options/ExecutorOptions.java | 33 +++++++++++------ .../extensions/gcp/options/GcsOptions.java | 35 ++++++++++++------- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java index eb45e569e25d3..a5464fb28642a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.options; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -6,7 +23,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; -public interface ExecutorOptions extends PipelineOptions{ +public interface ExecutorOptions extends PipelineOptions { /** * The ScheduledExecutorService instance to use to create threads, can be overridden to specify a @@ -19,17 +36,14 @@ public interface ExecutorOptions extends PipelineOptions{ "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " + "the default is to create a ScheduledExecutorService with a core number of threads " - + "equal to Math.max(4, Runtime.getRuntime().availableProcessors())") + + "equal to {@code Math.max(4, Runtime.getRuntime().availableProcessors()) }.") @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) @Hidden ScheduledExecutorService getScheduledExecutorService(); void setScheduledExecutorService(ScheduledExecutorService value); - - /** - * Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. - */ + /** Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. */ class ScheduledExecutorServiceFactory implements DefaultValueFactory { @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. @Override @@ -43,14 +57,11 @@ public ScheduledExecutorService create(PipelineOptions options) { * Also, the MapTaskExecutor launches the steps in reverse order and completes * them in forward order thus requiring enough threads so that each step's writers * can be active. + * The minimum of max(4, processors) was chosen as a default working configuration found in + * the Bigquery client library */ - - /*The minimum of max(4, processors) was chosen as a default working configuration found in - * the Bigquery client library - */ return Executors.newScheduledThreadPool( Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); } } - } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 3d2ec566e0168..7d5b17915ee7e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; @@ -36,7 +37,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; /** Options used to configure Google Cloud Storage. */ -public interface GcsOptions extends ApplicationNameOptions, GcpOptions, ExecutorOptions, PipelineOptions { +public interface GcsOptions extends ApplicationNameOptions, GcpOptions, PipelineOptions { /** The GcsUtil instance that should be used to communicate with Google Cloud Storage. */ @JsonIgnore @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.") @@ -48,27 +49,24 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Executor /** * The ExecutorService instance to use to create threads, can be overridden to specify an - * ExecutorService that is compatible with the user's environment. If unset, the default is to - * create a ScheduledExecutorService with a core number of threads equal to - * Math.max(4, Runtime.getRuntime().availableProcessors()). Deprecated in favor of - * getScheduledExecutorService instead + * ExecutorService that is compatible with the user's environment. If unset, the default is to use + * {@link ExecutorOptions} default ScheduledExecutorService. + * + * @deprecated use {@link ExecutorOptions#getScheduledExecutorService()} instead instead */ @JsonIgnore - @Description( - "The ExecutorService instance to use to create multiple threads. Can be overridden " - + "to specify an ExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create a ScheduledExecutorService with a core number of threads" - + "equal to Math.max(4, Runtime.getRuntime().availableProcessors()).") - @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) + @Default.InstanceFactory(ExecutorServiceFactory.class) @Hidden @Deprecated ExecutorService getExecutorService(); + /** + * @deprecated use {@link ExecutorOptions#setScheduledExecutorService(ScheduledExecutorService)} + * instead + */ @Deprecated void setExecutorService(ExecutorService value); - - /** GCS endpoint to use. If unspecified, uses the default endpoint. */ @JsonIgnore @Hidden @@ -130,6 +128,17 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Executor void setGcsPerformanceMetrics(Boolean reportPerformanceMetrics); + /** + * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The {@link + * ExecutorService} is compatible with AppEngine. + */ + class ExecutorServiceFactory implements DefaultValueFactory { + @Override + public ExecutorService create(PipelineOptions options) { + return options.as(ExecutorOptions.class).getScheduledExecutorService(); + } + } + /** * Creates a {@link PathValidator} object using the class specified in {@link * #getPathValidatorClass()}. From 863f547c82733d4a9be2d4e2c473c5bb7f79e92f Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 30 Sep 2022 13:00:54 -0400 Subject: [PATCH 11/19] clean up comments --- .../org/apache/beam/sdk/options/ExecutorOptions.java | 12 ++++++------ .../beam/sdk/extensions/gcp/options/GcsOptions.java | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java index a5464fb28642a..fca08364999e2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -26,17 +26,17 @@ public interface ExecutorOptions extends PipelineOptions { /** - * The ScheduledExecutorService instance to use to create threads, can be overridden to specify a - * ScheduledExecutorService that is compatible with the user's environment. If unset, the default - * is to create an ScheduledExecutorService with a core number of threads equal to Math.max(4, - * Runtime.getRuntime().availableProcessors()) + * The {@link ScheduledExecutorService} instance to use to create threads, can be overridden to + * specify a {@link ScheduledExecutorService} that is compatible with the user's environment. If + * unset, the default is to create an {@link ScheduledExecutorService} with a core number of + * threads equal to {@code Math.max(4,Runtime.getRuntime().availableProcessors())}. */ @JsonIgnore @Description( "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " + "the default is to create a ScheduledExecutorService with a core number of threads " - + "equal to {@code Math.max(4, Runtime.getRuntime().availableProcessors()) }.") + + "equal to Math.max(4, Runtime.getRuntime().availableProcessors()).") @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) @Hidden ScheduledExecutorService getScheduledExecutorService(); @@ -57,7 +57,7 @@ public ScheduledExecutorService create(PipelineOptions options) { * Also, the MapTaskExecutor launches the steps in reverse order and completes * them in forward order thus requiring enough threads so that each step's writers * can be active. - * The minimum of max(4, processors) was chosen as a default working configuration found in + *

The minimum of max(4, processors) was chosen as a default working configuration found in * the Bigquery client library */ return Executors.newScheduledThreadPool( diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 7d5b17915ee7e..0ea21dec76e39 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -50,9 +50,9 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline /** * The ExecutorService instance to use to create threads, can be overridden to specify an * ExecutorService that is compatible with the user's environment. If unset, the default is to use - * {@link ExecutorOptions} default ScheduledExecutorService. + * {@link ExecutorOptions#getScheduledExecutorService()}. * - * @deprecated use {@link ExecutorOptions#getScheduledExecutorService()} instead instead + * @deprecated use {@link ExecutorOptions#getScheduledExecutorService()} instead */ @JsonIgnore @Default.InstanceFactory(ExecutorServiceFactory.class) @@ -61,7 +61,7 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline ExecutorService getExecutorService(); /** - * @deprecated use {@link ExecutorOptions#setScheduledExecutorService(ScheduledExecutorService)} + * @deprecated use {@link ExecutorOptions#setScheduledExecutorService} * instead */ @Deprecated From b3ce30382b844f3d9f0926729abca20193f0056a Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 30 Sep 2022 13:08:51 -0400 Subject: [PATCH 12/19] run spotless --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 86338d6889861..bc644a923577e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1493,8 +1493,8 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option } /** - * OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide - * {@link ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient} + * OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide {@link + * ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient} */ private static class OptionsExecutionProvider implements ExecutorProvider { From de9acee12cdef8144164753146324d39226b671a Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 5 Oct 2022 12:25:26 -0400 Subject: [PATCH 13/19] Configure scheduled executor to spin up and down core threads to mimic a dynamic thread pool --- .../org/apache/beam/sdk/options/ExecutorOptions.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java index fca08364999e2..c6d49828e7648 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -18,8 +18,9 @@ package org.apache.beam.sdk.options; import com.fasterxml.jackson.annotation.JsonIgnore; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -60,8 +61,10 @@ public ScheduledExecutorService create(PipelineOptions options) { *

The minimum of max(4, processors) was chosen as a default working configuration found in * the Bigquery client library */ - return Executors.newScheduledThreadPool( - Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(Integer.MAX_VALUE, threadFactoryBuilder.build()); + executor.setKeepAliveTime(1, TimeUnit.MINUTES); + executor.allowCoreThreadTimeOut(true); + return executor; } } } From 100cb78f2e3ed521be760760bf8836670d8b5dd5 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 6 Oct 2022 11:32:10 -0400 Subject: [PATCH 14/19] Run spotless --- .../java/org/apache/beam/sdk/options/ExecutorOptions.java | 3 ++- .../apache/beam/sdk/extensions/gcp/options/GcsOptions.java | 6 +----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java index c6d49828e7648..d31449eeede4e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -61,7 +61,8 @@ public ScheduledExecutorService create(PipelineOptions options) { *

The minimum of max(4, processors) was chosen as a default working configuration found in * the Bigquery client library */ - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(Integer.MAX_VALUE, threadFactoryBuilder.build()); + ScheduledThreadPoolExecutor executor = + new ScheduledThreadPoolExecutor(Integer.MAX_VALUE, threadFactoryBuilder.build()); executor.setKeepAliveTime(1, TimeUnit.MINUTES); executor.allowCoreThreadTimeOut(true); return executor; diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 0ea21dec76e39..e4aa26305ccc2 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; @@ -60,10 +59,7 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline @Deprecated ExecutorService getExecutorService(); - /** - * @deprecated use {@link ExecutorOptions#setScheduledExecutorService} - * instead - */ + /** @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead */ @Deprecated void setExecutorService(ExecutorService value); From 3446728f48c3a154000c871e1710eb1b21f5edbe Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 6 Oct 2022 13:53:16 -0400 Subject: [PATCH 15/19] add . for checkstyle --- runners/google-cloud-dataflow-java/build.gradle | 3 ++- .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 399f45f5b21f0..d42811e26b26f 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -285,7 +285,8 @@ def buildAndPushDockerJavaContainer = tasks.register("buildAndPushDockerJavaCont commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerJavaImageName}" } exec { - commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}" +// commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}" + commandLine "docker", "push", "${dockerJavaImageName}" } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index bc644a923577e..002e307012ef2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1494,7 +1494,7 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option /** * OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide {@link - * ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient} + * ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient}. */ private static class OptionsExecutionProvider implements ExecutorProvider { From 8d9bb756d24ecdc9192bc89e20ef7706ccadeb8a Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 14 Oct 2022 14:33:46 -0400 Subject: [PATCH 16/19] Update ExecutorOptions to use @lukecwik's unbounded scheduled executor --- .../beam/sdk/options/ExecutorOptions.java | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java index d31449eeede4e..97ca360e8119e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -19,10 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; public interface ExecutorOptions extends PipelineOptions { @@ -46,26 +43,9 @@ public interface ExecutorOptions extends PipelineOptions { /** Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. */ class ScheduledExecutorServiceFactory implements DefaultValueFactory { - @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. @Override public ScheduledExecutorService create(PipelineOptions options) { - ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); - threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); - threadFactoryBuilder.setDaemon(true); - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - *

The minimum of max(4, processors) was chosen as a default working configuration found in - * the Bigquery client library - */ - ScheduledThreadPoolExecutor executor = - new ScheduledThreadPoolExecutor(Integer.MAX_VALUE, threadFactoryBuilder.build()); - executor.setKeepAliveTime(1, TimeUnit.MINUTES); - executor.allowCoreThreadTimeOut(true); - return executor; + return new UnboundedScheduledExecutorService(); } } } From a59e30a84957e875b480924d4ad50f5476489931 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 14 Oct 2022 15:59:41 -0400 Subject: [PATCH 17/19] revert change to runners/google-cloud-dataflow-java/build.gradle --- runners/google-cloud-dataflow-java/build.gradle | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 04ededc8ecd4b..8429bb40816aa 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -287,8 +287,7 @@ def buildAndPushDockerJavaContainer = tasks.register("buildAndPushDockerJavaCont commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerJavaImageName}" } exec { -// commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}" - commandLine "docker", "push", "${dockerJavaImageName}" + commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}" } } } From eb882102e519a0db6cea12d89776bf471023dd85 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 14 Oct 2022 16:06:29 -0400 Subject: [PATCH 18/19] Update comments on options files --- .../apache/beam/sdk/options/ExecutorOptions.java | 16 ++++++++++++---- .../sdk/extensions/gcp/options/GcsOptions.java | 3 ++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java index 97ca360e8119e..2037d21742265 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -21,20 +21,21 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; +/** + * Options for configuring the {@link ScheduledExecutorService} used throughout the Java runtime. + */ public interface ExecutorOptions extends PipelineOptions { /** * The {@link ScheduledExecutorService} instance to use to create threads, can be overridden to * specify a {@link ScheduledExecutorService} that is compatible with the user's environment. If - * unset, the default is to create an {@link ScheduledExecutorService} with a core number of - * threads equal to {@code Math.max(4,Runtime.getRuntime().availableProcessors())}. + * unset, the default is to create an {@link UnboundedScheduledExecutorService}. */ @JsonIgnore @Description( "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create a ScheduledExecutorService with a core number of threads " - + "equal to Math.max(4, Runtime.getRuntime().availableProcessors()).") + + "the default is to create an UnboundedScheduledExecutorService.") @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) @Hidden ScheduledExecutorService getScheduledExecutorService(); @@ -45,6 +46,13 @@ public interface ExecutorOptions extends PipelineOptions { class ScheduledExecutorServiceFactory implements DefaultValueFactory { @Override public ScheduledExecutorService create(PipelineOptions options) { + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ return new UnboundedScheduledExecutorService(); } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index e4aa26305ccc2..503c994778a45 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -59,7 +59,8 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline @Deprecated ExecutorService getExecutorService(); - /** @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead */ + /** @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead. If set, it may + * result in multiple ExecutorServices, and therefor thread pools, in the runtime */ @Deprecated void setExecutorService(ExecutorService value); From cc869f782f633621cdee8f7832e9da3f74f3bdac Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Fri, 14 Oct 2022 15:50:08 -0700 Subject: [PATCH 19/19] Update sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java --- .../apache/beam/sdk/extensions/gcp/options/GcsOptions.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 503c994778a45..fea7be7f5c723 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -59,8 +59,10 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline @Deprecated ExecutorService getExecutorService(); - /** @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead. If set, it may - * result in multiple ExecutorServices, and therefor thread pools, in the runtime */ + /** + * @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead. If set, it may + * result in multiple ExecutorServices, and therefore thread pools, in the runtime. + */ @Deprecated void setExecutorService(ExecutorService value);