Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Pratap Singh committed Jul 17, 2024
1 parent c581fff commit d64067f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,8 @@ public class ConfigurationKeys {
// describes a comma separated list of non transient errors that may come in a gobblin job
// e.g. "invalid_grant,CredentialStoreException"
public static final String GOBBLIN_NON_TRANSIENT_ERRORS = "gobblin.errorMessages.nonTransientErrors";
// Key to store a comma-separated list of exception class names that should be retried
public static final String EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY = "EXCEPTION_LIST_FOR_RETRY";

/**
* Configuration properties related to Flows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.CollectionUtils;


import com.github.rholder.retry.Retryer;
Expand All @@ -39,6 +40,8 @@

import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;



/**
Expand All @@ -58,9 +61,6 @@ public class RetryerFactory<T> {
public static final String RETRY_TYPE = "retry_type";
// value large or equal to 1
public static final String RETRY_TIMES = "retry_times";

// Key to store a comma-separated list of exception class names that should be retried
public static final String EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY = "EXCEPTION_LIST_FOR_RETRY";
public static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE_DEFAULT;

private static final Config DEFAULTS;
Expand Down Expand Up @@ -98,17 +98,15 @@ public static <T> Retryer<T> newInstance(Config config, Optional<RetryListener>

RetryerBuilder<T> builder;

Predicate<Throwable> retryPredicate = getRetryPredicateFromConfigOrDefault(config);

switch (type) {
case EXPONENTIAL:
builder = newExponentialRetryerBuilder(config, retryPredicate);
builder = newExponentialRetryerBuilder(config);
break;
case FIXED:
builder = newFixedRetryerBuilder(config, retryPredicate);
builder = newFixedRetryerBuilder(config);
break;
case FIXED_ATTEMPT:
builder = newFixedAttemptBoundRetryerBuilder(config, retryPredicate);
builder = newFixedAttemptBoundRetryerBuilder(config);
break;
default:
throw new IllegalArgumentException(type + " is not supported");
Expand All @@ -127,34 +125,32 @@ public static <T> Retryer<T> newInstance(Config config, Optional<RetryListener>
@VisibleForTesting
public static Predicate<Throwable> getRetryPredicateFromConfigOrDefault(Config config) {
// Retrieve the list of exception class names from the configuration
List<String> exceptionList = ConfigUtils.getStringList(config, EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY);
List<String> exceptionList = ConfigUtils.getStringList(config, ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY);

// If the exception list is null or empty, return the default retry predicate
if (exceptionList == null || exceptionList.isEmpty()) {
if (CollectionUtils.isEmpty(exceptionList)) {
return RETRY_EXCEPTION_PREDICATE_DEFAULT;
}

// Create a retry predicate by mapping each exception class name to a Predicate
Predicate<Throwable> retryPredicate = exceptionList.stream().map(exceptionClassName -> {
return exceptionList.stream().map(exceptionClassName -> {
try {
Class<?> clazz = Class.forName(exceptionClassName);
if (Exception.class.isAssignableFrom(clazz)) {
// Return a Predicate that checks if a Throwable is an instance of the class
return (Predicate<Throwable>) clazz::isInstance;
} else {
LOG.error("{} is not an exception.", exceptionClassName);
LOG.error("{} is not an exception,ignoring", exceptionClassName);
}
} catch (ClassNotFoundException exception) {
LOG.error("Class not found for the given exception className {}", exceptionClassName, exception);
} catch (Exception exception) {
LOG.error("Failed to instantiate exception {}", exceptionClassName, exception);
} catch (ClassNotFoundException ignored) {
LOG.error("Class not found for the given exception className {},ignoring it", exceptionClassName, ignored);
} catch (Exception ignored) {
LOG.error("Failed to instantiate exception {},ignoring it", exceptionClassName, ignored);
}
return null;
}).filter(Objects::nonNull) // Filter out any null values
.reduce(com.google.common.base.Predicates::or) // Combine all predicates using logical OR
.orElse(RETRY_EXCEPTION_PREDICATE_DEFAULT); // Default to retryExceptionPredicate if no valid predicates are found

return retryPredicate;
}

/**
Expand All @@ -164,28 +160,25 @@ public static <T> Retryer<T> newInstance(Config config) {
return newInstance(config, Optional.empty());
}

private static <T> RetryerBuilder<T> newFixedRetryerBuilder(Config config,
Predicate<Throwable> retryExceptionPredicate) {
private static <T> RetryerBuilder<T> newFixedRetryerBuilder(Config config) {
return RetryerBuilder.<T>newBuilder()
.retryIfException(retryExceptionPredicate)
.retryIfException(getRetryPredicateFromConfigOrDefault(config))
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS), TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS), TimeUnit.MILLISECONDS));
}

private static <T> RetryerBuilder<T> newExponentialRetryerBuilder(Config config,
Predicate<Throwable> retryExceptionPredicate) {
private static <T> RetryerBuilder<T> newExponentialRetryerBuilder(Config config) {
return RetryerBuilder.<T>newBuilder()
.retryIfException(retryExceptionPredicate)
.retryIfException(getRetryPredicateFromConfigOrDefault(config))
.withWaitStrategy(
WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER), config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS), TimeUnit.MILLISECONDS));
}

private static <T> RetryerBuilder<T> newFixedAttemptBoundRetryerBuilder(Config config,
Predicate<Throwable> retryExceptionPredicate) {
private static <T> RetryerBuilder<T> newFixedAttemptBoundRetryerBuilder(Config config) {
return RetryerBuilder.<T>newBuilder()
.retryIfException(retryExceptionPredicate)
.retryIfException(getRetryPredicateFromConfigOrDefault(config))
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS), TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(config.getInt(RETRY_TIMES)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,34 @@

import java.util.Arrays;

import org.mockito.Mockito;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.testng.Assert;
import org.testng.annotations.Test;

import com.google.common.base.Predicate;
import com.typesafe.config.Config;
import com.google.common.base.Predicate;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;


/**
* Unit tests for the {@link org.apache.gobblin.util.retry.RetryerFactory} class.
*/
public class RetryerFactoryTest {
private static final String EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY = "EXCEPTION_LIST_FOR_RETRY";

@Test
public void testGetRetryPredicateFromConfigOrDefault_withEmptyConfig() {
Config config = Mockito.mock(Config.class);
Mockito.when(config.hasPath(EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY)).thenReturn(false);
Config config = ConfigFactory.empty();
Predicate<Throwable> result = RetryerFactory.getRetryPredicateFromConfigOrDefault(config);

Assert.assertEquals(RetryerFactory.RETRY_EXCEPTION_PREDICATE_DEFAULT, result);
}

@Test
public void testGetRetryPredicateFromConfigOrDefault_withValidException() {
Config config = Mockito.mock(Config.class);
Mockito.when(config.hasPath(EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY)).thenReturn(true);
Mockito.when(config.getStringList(EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY))
.thenReturn(Arrays.asList("java.lang.RuntimeException"));
Config config = ConfigFactory.empty()
.withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
ConfigValueFactory.fromAnyRef(Arrays.asList("java.lang.RuntimeException")));

Predicate<Throwable> result = RetryerFactory.getRetryPredicateFromConfigOrDefault(config);

Expand All @@ -55,10 +55,9 @@ public void testGetRetryPredicateFromConfigOrDefault_withValidException() {

@Test
public void testGetRetryPredicateFromConfigOrDefault_withInvalidException() {
Config config = Mockito.mock(Config.class);
Mockito.when(config.hasPath(EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY)).thenReturn(true);
Mockito.when(config.getStringList(EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY))
.thenReturn(Arrays.asList("non.existent.Exception"));
Config config = ConfigFactory.empty()
.withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
ConfigValueFactory.fromAnyRef(Arrays.asList("non.existent.Exception")));

Predicate<Throwable> result = RetryerFactory.getRetryPredicateFromConfigOrDefault(config);

Expand All @@ -67,10 +66,9 @@ public void testGetRetryPredicateFromConfigOrDefault_withInvalidException() {

@Test
public void testGetRetryPredicateFromConfigOrDefault_withMixedExceptions() {
Config config = Mockito.mock(Config.class);
Mockito.when(config.hasPath(EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY)).thenReturn(true);
Mockito.when(config.getStringList(EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY))
.thenReturn(Arrays.asList("java.lang.RuntimeException", "non.existent.Exception"));
Config config = ConfigFactory.empty()
.withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
ConfigValueFactory.fromAnyRef(Arrays.asList("java.lang.RuntimeException", "non.existent.Exception")));

Predicate<Throwable> result = RetryerFactory.getRetryPredicateFromConfigOrDefault(config);

Expand Down

0 comments on commit d64067f

Please sign in to comment.