diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index 506145f35290..8ee12f22ed03 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -35,12 +35,15 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.solace
+ implementation library.java.google_cloud_core
implementation library.java.vendored_guava_32_1_2_jre
implementation project(":sdks:java:extensions:avro")
implementation library.java.vendored_grpc_1_60_1
implementation library.java.avro
permitUnusedDeclared library.java.avro
- implementation library.java.vendored_grpc_1_60_1
+ implementation library.java.google_api_common
+ implementation library.java.gax
+ implementation library.java.threetenbp
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/RetryCallableManager.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/RetryCallableManager.java
new file mode 100644
index 000000000000..2244ef67a4f0
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/RetryCallableManager.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import com.google.api.core.NanoClock;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.ExceptionHandler;
+import com.google.cloud.ExceptionHandler.Interceptor;
+import com.google.cloud.RetryHelper;
+import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * A class that manages retrying of callables based on the exceptions they throw.
+ *
+ *
This class provides a way to retry a callable if it throws an exception. The retry behavior is
+ * configurable using {@link RetrySettings}.
+ *
+ *
This class is internal and should not be used directly.
+ */
+@Internal
+@AutoValue
+public abstract class RetryCallableManager implements Serializable {
+
+ private static final int NUMBER_OF_RETRIES = 4;
+ private static final int RETRY_INTERVAL_SECONDS = 1;
+ private static final int RETRY_DELAY_MULTIPLIER = 2;
+ private static final int MAX_DELAY_SECONDS =
+ NUMBER_OF_RETRIES * RETRY_DELAY_MULTIPLIER * RETRY_INTERVAL_SECONDS + 1;
+
+ /** Creates a new {@link RetryCallableManager} with default retry settings. */
+ public static RetryCallableManager create() {
+ return builder().build();
+ }
+
+ /**
+ * Method that executes and repeats the execution of the callable argument, if it throws one of
+ * the exceptions from the exceptionsToIntercept Set.
+ *
+ * @param callable The callable to execute.
+ * @param exceptionsToIntercept The set of exceptions to intercept.
+ * @param The type of the callable's return value.
+ * @return The return value of the callable.
+ */
+ public V retryCallable(
+ Callable callable, Set> exceptionsToIntercept) {
+ return RetryHelper.runWithRetries(
+ callable,
+ getRetrySettings(),
+ getExceptionHandlerForExceptions(exceptionsToIntercept),
+ NanoClock.getDefaultClock());
+ }
+
+ private ExceptionHandler getExceptionHandlerForExceptions(
+ Set> exceptionsToIntercept) {
+ return ExceptionHandler.newBuilder()
+ .abortOn(RuntimeException.class)
+ .addInterceptors(new ExceptionSetInterceptor(ImmutableSet.copyOf(exceptionsToIntercept)))
+ .build();
+ }
+
+ abstract RetrySettings getRetrySettings();
+
+ abstract Builder toBuilder();
+
+ public static Builder builder() {
+ return new AutoValue_RetryCallableManager.Builder()
+ .setRetrySettings(
+ RetrySettings.newBuilder()
+ .setInitialRetryDelay(org.threeten.bp.Duration.ofSeconds(RETRY_INTERVAL_SECONDS))
+ .setMaxAttempts(NUMBER_OF_RETRIES)
+ .setMaxRetryDelay(org.threeten.bp.Duration.ofSeconds(MAX_DELAY_SECONDS))
+ .setRetryDelayMultiplier(RETRY_DELAY_MULTIPLIER)
+ .build());
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ abstract Builder setRetrySettings(RetrySettings retrySettings);
+
+ abstract RetryCallableManager build();
+ }
+
+ private static class ExceptionSetInterceptor implements Interceptor {
+ private static final long serialVersionUID = -8429573586820467828L;
+ private final Set> exceptionsToIntercept;
+
+ public ExceptionSetInterceptor(Set> exceptionsToIntercept) {
+ this.exceptionsToIntercept = exceptionsToIntercept;
+ }
+
+ @Override
+ public RetryResult afterEval(Exception exception, RetryResult retryResult) {
+ return Interceptor.RetryResult.CONTINUE_EVALUATION;
+ }
+
+ @Override
+ public RetryResult beforeEval(Exception exceptionToEvaluate) {
+ for (Class extends Exception> exceptionToIntercept : exceptionsToIntercept) {
+ if (isOf(exceptionToIntercept, exceptionToEvaluate)) {
+ return Interceptor.RetryResult.RETRY;
+ }
+ }
+ return Interceptor.RetryResult.CONTINUE_EVALUATION;
+ }
+
+ private boolean isOf(Class> clazz, Object obj) {
+ return clazz.isInstance(obj);
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/RetryCallableManagerTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/RetryCallableManagerTest.java
new file mode 100644
index 000000000000..b934d711e690
--- /dev/null
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/RetryCallableManagerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.cloud.RetryHelper.RetryHelperException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RetryCallableManagerTest {
+ private static final int NUMBER_OF_RETRIES = 4;
+ private static final int RETRY_INTERVAL_SECONDS = 0;
+ private static final int RETRY_MULTIPLIER = 2;
+ private static final int MAX_DELAY = 0;
+
+ private RetryCallableManager retryCallableManager;
+
+ @Before
+ public void setUp() {
+ retryCallableManager =
+ RetryCallableManager.builder()
+ .setRetrySettings(
+ RetrySettings.newBuilder()
+ .setInitialRetryDelay(
+ org.threeten.bp.Duration.ofSeconds(RETRY_INTERVAL_SECONDS))
+ .setMaxAttempts(NUMBER_OF_RETRIES)
+ .setMaxRetryDelay(org.threeten.bp.Duration.ofSeconds(MAX_DELAY))
+ .setRetryDelayMultiplier(RETRY_MULTIPLIER)
+ .build())
+ .build();
+ }
+
+ @Test
+ public void testRetryCallable_ReturnsExpected() {
+ AtomicInteger executeCounter = new AtomicInteger(0);
+ Callable incrementingFunction =
+ () -> {
+ executeCounter.incrementAndGet();
+ if (executeCounter.get() < 2) {
+ throw new MyException();
+ }
+ return executeCounter.get();
+ };
+ Integer result =
+ retryCallableManager.retryCallable(
+ incrementingFunction, ImmutableSet.of(MyException.class));
+ assertEquals(String.format("Should return 2, instead returned %d.", result), 2, (int) result);
+ }
+
+ @Test
+ public void testRetryCallable_RetriesExpectedNumberOfTimes() {
+ AtomicInteger executeCounter = new AtomicInteger(0);
+ Callable incrementingFunction =
+ () -> {
+ executeCounter.incrementAndGet();
+ if (executeCounter.get() < 2) {
+ throw new MyException();
+ }
+ return executeCounter.get();
+ };
+ retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class));
+ assertEquals(
+ String.format("Should run 2 times, instead ran %d times.", executeCounter.get()),
+ 2,
+ executeCounter.get());
+ }
+
+ @Test(expected = RetryHelperException.class)
+ public void testRetryCallable_ThrowsRetryHelperException() {
+ Callable incrementingFunction =
+ () -> {
+ {
+ throw new MyException();
+ }
+ };
+ retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class));
+ }
+
+ @Test
+ public void testRetryCallable_ExecutionCountIsCorrectAfterMultipleExceptions() {
+ AtomicInteger executeCounter = new AtomicInteger(0);
+ Callable incrementingFunction =
+ () -> {
+ executeCounter.incrementAndGet();
+ throw new MyException();
+ };
+ try {
+ retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class));
+ } catch (RetryHelperException e) {
+ // ignore exception to check the executeCounter
+ }
+ assertEquals(
+ String.format("Should execute 4 times, instead executed %d times", executeCounter.get()),
+ 4,
+ executeCounter.get());
+ }
+
+ @Test(expected = RetryHelperException.class)
+ public void testRetryCallable_ThrowsRetryHelperExceptionOnUnspecifiedException() {
+ Callable incrementingFunction =
+ () -> {
+ throw new DoNotIgnoreException();
+ };
+ retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class));
+ }
+
+ @Test
+ public void testRetryCallable_ChecksForAllDefinedExceptions() {
+ AtomicInteger executeCounter = new AtomicInteger(0);
+ Callable incrementingFunction =
+ () -> {
+ executeCounter.incrementAndGet();
+ if (executeCounter.get() % 2 == 0) {
+ throw new MyException();
+ } else if (executeCounter.get() % 2 == 1) {
+ throw new AnotherException();
+ }
+ return 0;
+ };
+ try {
+ retryCallableManager.retryCallable(
+ incrementingFunction, ImmutableSet.of(MyException.class, AnotherException.class));
+ } catch (RetryHelperException e) {
+ // ignore exception to check the executeCounter
+ }
+ assertEquals(
+ String.format("Should execute 4 times, instead executed %d times", executeCounter.get()),
+ 4,
+ executeCounter.get());
+ }
+
+ private static class MyException extends Exception {
+ public MyException() {
+ super();
+ }
+ }
+
+ private static class AnotherException extends Exception {
+ public AnotherException() {
+ super();
+ }
+ }
+
+ private static class DoNotIgnoreException extends Exception {
+ public DoNotIgnoreException() {
+ super();
+ }
+ }
+}