From 970a3f52680aa0f97988d731c32a7994c9a088d1 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 25 Jun 2024 18:31:07 +0200 Subject: [PATCH] Add RetryCallable mechanism for remote calls (#31539) --- sdks/java/io/solace/build.gradle | 5 +- .../sdk/io/solace/RetryCallableManager.java | 130 ++++++++++++++ .../io/solace/RetryCallableManagerTest.java | 169 ++++++++++++++++++ 3 files changed, 303 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/RetryCallableManager.java create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/RetryCallableManagerTest.java diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 506145f35290..8ee12f22ed03 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -35,12 +35,15 @@ dependencies { implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.solace + implementation library.java.google_cloud_core implementation library.java.vendored_guava_32_1_2_jre implementation project(":sdks:java:extensions:avro") implementation library.java.vendored_grpc_1_60_1 implementation library.java.avro permitUnusedDeclared library.java.avro - implementation library.java.vendored_grpc_1_60_1 + implementation library.java.google_api_common + implementation library.java.gax + implementation library.java.threetenbp testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/RetryCallableManager.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/RetryCallableManager.java new file mode 100644 index 000000000000..2244ef67a4f0 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/RetryCallableManager.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import com.google.api.core.NanoClock; +import com.google.api.gax.retrying.RetrySettings; +import com.google.auto.value.AutoValue; +import com.google.cloud.ExceptionHandler; +import com.google.cloud.ExceptionHandler.Interceptor; +import com.google.cloud.RetryHelper; +import java.io.Serializable; +import java.util.Set; +import java.util.concurrent.Callable; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; + +/** + * A class that manages retrying of callables based on the exceptions they throw. + * + *

This class provides a way to retry a callable if it throws an exception. The retry behavior is + * configurable using {@link RetrySettings}. + * + *

This class is internal and should not be used directly. + */ +@Internal +@AutoValue +public abstract class RetryCallableManager implements Serializable { + + private static final int NUMBER_OF_RETRIES = 4; + private static final int RETRY_INTERVAL_SECONDS = 1; + private static final int RETRY_DELAY_MULTIPLIER = 2; + private static final int MAX_DELAY_SECONDS = + NUMBER_OF_RETRIES * RETRY_DELAY_MULTIPLIER * RETRY_INTERVAL_SECONDS + 1; + + /** Creates a new {@link RetryCallableManager} with default retry settings. */ + public static RetryCallableManager create() { + return builder().build(); + } + + /** + * Method that executes and repeats the execution of the callable argument, if it throws one of + * the exceptions from the exceptionsToIntercept Set. + * + * @param callable The callable to execute. + * @param exceptionsToIntercept The set of exceptions to intercept. + * @param The type of the callable's return value. + * @return The return value of the callable. + */ + public V retryCallable( + Callable callable, Set> exceptionsToIntercept) { + return RetryHelper.runWithRetries( + callable, + getRetrySettings(), + getExceptionHandlerForExceptions(exceptionsToIntercept), + NanoClock.getDefaultClock()); + } + + private ExceptionHandler getExceptionHandlerForExceptions( + Set> exceptionsToIntercept) { + return ExceptionHandler.newBuilder() + .abortOn(RuntimeException.class) + .addInterceptors(new ExceptionSetInterceptor(ImmutableSet.copyOf(exceptionsToIntercept))) + .build(); + } + + abstract RetrySettings getRetrySettings(); + + abstract Builder toBuilder(); + + public static Builder builder() { + return new AutoValue_RetryCallableManager.Builder() + .setRetrySettings( + RetrySettings.newBuilder() + .setInitialRetryDelay(org.threeten.bp.Duration.ofSeconds(RETRY_INTERVAL_SECONDS)) + .setMaxAttempts(NUMBER_OF_RETRIES) + .setMaxRetryDelay(org.threeten.bp.Duration.ofSeconds(MAX_DELAY_SECONDS)) + .setRetryDelayMultiplier(RETRY_DELAY_MULTIPLIER) + .build()); + } + + @AutoValue.Builder + public abstract static class Builder { + abstract Builder setRetrySettings(RetrySettings retrySettings); + + abstract RetryCallableManager build(); + } + + private static class ExceptionSetInterceptor implements Interceptor { + private static final long serialVersionUID = -8429573586820467828L; + private final Set> exceptionsToIntercept; + + public ExceptionSetInterceptor(Set> exceptionsToIntercept) { + this.exceptionsToIntercept = exceptionsToIntercept; + } + + @Override + public RetryResult afterEval(Exception exception, RetryResult retryResult) { + return Interceptor.RetryResult.CONTINUE_EVALUATION; + } + + @Override + public RetryResult beforeEval(Exception exceptionToEvaluate) { + for (Class exceptionToIntercept : exceptionsToIntercept) { + if (isOf(exceptionToIntercept, exceptionToEvaluate)) { + return Interceptor.RetryResult.RETRY; + } + } + return Interceptor.RetryResult.CONTINUE_EVALUATION; + } + + private boolean isOf(Class clazz, Object obj) { + return clazz.isInstance(obj); + } + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/RetryCallableManagerTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/RetryCallableManagerTest.java new file mode 100644 index 000000000000..b934d711e690 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/RetryCallableManagerTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import static org.junit.Assert.assertEquals; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.RetryHelper.RetryHelperException; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.junit.Before; +import org.junit.Test; + +public class RetryCallableManagerTest { + private static final int NUMBER_OF_RETRIES = 4; + private static final int RETRY_INTERVAL_SECONDS = 0; + private static final int RETRY_MULTIPLIER = 2; + private static final int MAX_DELAY = 0; + + private RetryCallableManager retryCallableManager; + + @Before + public void setUp() { + retryCallableManager = + RetryCallableManager.builder() + .setRetrySettings( + RetrySettings.newBuilder() + .setInitialRetryDelay( + org.threeten.bp.Duration.ofSeconds(RETRY_INTERVAL_SECONDS)) + .setMaxAttempts(NUMBER_OF_RETRIES) + .setMaxRetryDelay(org.threeten.bp.Duration.ofSeconds(MAX_DELAY)) + .setRetryDelayMultiplier(RETRY_MULTIPLIER) + .build()) + .build(); + } + + @Test + public void testRetryCallable_ReturnsExpected() { + AtomicInteger executeCounter = new AtomicInteger(0); + Callable incrementingFunction = + () -> { + executeCounter.incrementAndGet(); + if (executeCounter.get() < 2) { + throw new MyException(); + } + return executeCounter.get(); + }; + Integer result = + retryCallableManager.retryCallable( + incrementingFunction, ImmutableSet.of(MyException.class)); + assertEquals(String.format("Should return 2, instead returned %d.", result), 2, (int) result); + } + + @Test + public void testRetryCallable_RetriesExpectedNumberOfTimes() { + AtomicInteger executeCounter = new AtomicInteger(0); + Callable incrementingFunction = + () -> { + executeCounter.incrementAndGet(); + if (executeCounter.get() < 2) { + throw new MyException(); + } + return executeCounter.get(); + }; + retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class)); + assertEquals( + String.format("Should run 2 times, instead ran %d times.", executeCounter.get()), + 2, + executeCounter.get()); + } + + @Test(expected = RetryHelperException.class) + public void testRetryCallable_ThrowsRetryHelperException() { + Callable incrementingFunction = + () -> { + { + throw new MyException(); + } + }; + retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class)); + } + + @Test + public void testRetryCallable_ExecutionCountIsCorrectAfterMultipleExceptions() { + AtomicInteger executeCounter = new AtomicInteger(0); + Callable incrementingFunction = + () -> { + executeCounter.incrementAndGet(); + throw new MyException(); + }; + try { + retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class)); + } catch (RetryHelperException e) { + // ignore exception to check the executeCounter + } + assertEquals( + String.format("Should execute 4 times, instead executed %d times", executeCounter.get()), + 4, + executeCounter.get()); + } + + @Test(expected = RetryHelperException.class) + public void testRetryCallable_ThrowsRetryHelperExceptionOnUnspecifiedException() { + Callable incrementingFunction = + () -> { + throw new DoNotIgnoreException(); + }; + retryCallableManager.retryCallable(incrementingFunction, ImmutableSet.of(MyException.class)); + } + + @Test + public void testRetryCallable_ChecksForAllDefinedExceptions() { + AtomicInteger executeCounter = new AtomicInteger(0); + Callable incrementingFunction = + () -> { + executeCounter.incrementAndGet(); + if (executeCounter.get() % 2 == 0) { + throw new MyException(); + } else if (executeCounter.get() % 2 == 1) { + throw new AnotherException(); + } + return 0; + }; + try { + retryCallableManager.retryCallable( + incrementingFunction, ImmutableSet.of(MyException.class, AnotherException.class)); + } catch (RetryHelperException e) { + // ignore exception to check the executeCounter + } + assertEquals( + String.format("Should execute 4 times, instead executed %d times", executeCounter.get()), + 4, + executeCounter.get()); + } + + private static class MyException extends Exception { + public MyException() { + super(); + } + } + + private static class AnotherException extends Exception { + public AnotherException() { + super(); + } + } + + private static class DoNotIgnoreException extends Exception { + public DoNotIgnoreException() { + super(); + } + } +}