Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3669] Add a remote request retry mechanism for 'Remotehoodietablefiles… #5884

Merged
merged 4 commits into from
Aug 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
.withRemoteServerHost(hostAddr)
.withRemoteServerPort(serverPort)
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
.withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
.withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
.withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
.withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
.withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial
LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
+ viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout="
+ viewConf.getRemoteTimelineClientTimeoutSecs());
return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
}

public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
.defaultValue(5 * 60) // 5 min
.withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");

public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.enable")
.defaultValue("false")
.sinceVersion("0.12.0")
.withDocumentation("Whether to enable API request retry for remote file system view.");

public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.max_numbers")
.defaultValue(3) // 3 times
.sinceVersion("0.12.0")
.withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server.");

public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
.defaultValue(100L)
.sinceVersion("0.12.0")
.withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");

public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.max_interval_ms")
.defaultValue(2000L)
.sinceVersion("0.12.0")
.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");

public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.exceptions")
.defaultValue("")
.sinceVersion("0.12.0")
.withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
+ "Default is empty which means retry all the IOException and RuntimeException from Remote Request.");

public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty
.key("hoodie.filesystem.remote.backup.view.enable")
.defaultValue("true") // Need to be disabled only for tests.
Expand Down Expand Up @@ -144,6 +175,26 @@ public Integer getRemoteTimelineClientTimeoutSecs() {
return getInt(REMOTE_TIMEOUT_SECS);
}

public boolean isRemoteTimelineClientRetryEnabled() {
return getBoolean(REMOTE_RETRY_ENABLE);
}

public Integer getRemoteTimelineClientMaxRetryNumbers() {
return getInt(REMOTE_MAX_RETRY_NUMBERS);
}

public Long getRemoteTimelineInitialRetryIntervalMs() {
return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
}

public Long getRemoteTimelineClientMaxRetryIntervalMs() {
return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
}

public String getRemoteTimelineClientRetryExceptions() {
return getString(RETRY_EXCEPTIONS);
}

public long getMaxMemoryForFileGroupMap() {
long totalMemory = getLong(SPILLABLE_MEMORY);
return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile();
Expand Down Expand Up @@ -245,6 +296,31 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout
return this;
}

public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry));
return this;
}

public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) {
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString());
return this;
}

public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) {
fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString());
return this;
}

public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) {
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString());
return this;
}

public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) {
fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
return this;
}

public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) {
fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString());
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -132,22 +133,35 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,

private boolean closed = false;

private RetryHelper<Response> retryHelper;

private final HttpRequestCheckedFunction urlCheckedFunc;

private enum RequestMethod {
GET, POST
}

public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
this(server, port, metaClient, 300);
this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
}
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved

public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) {
public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
this.basePath = metaClient.getBasePath();
this.serverHost = server;
this.serverPort = port;
this.mapper = new ObjectMapper();
this.metaClient = metaClient;
this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
this.timeoutSecs = timeoutSecs;
this.serverHost = viewConf.getRemoteViewServerHost();
this.serverPort = viewConf.getRemoteViewServerPort();
this.timeoutSecs = viewConf.getRemoteTimelineClientTimeoutSecs();
this.urlCheckedFunc = new HttpRequestCheckedFunction(this.timeoutSecs * 1000);
if (viewConf.isRemoteTimelineClientRetryEnabled()) {
retryHelper = new RetryHelper(
viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
viewConf.getRemoteTimelineClientMaxRetryNumbers(),
viewConf.getRemoteTimelineInitialRetryIntervalMs(),
viewConf.getRemoteTimelineClientRetryExceptions(),
"Sending request");
}
}

private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
Expand All @@ -165,17 +179,9 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame

String url = builder.toString();
LOG.info("Sending request : (" + url + ")");
Response response;
int timeout = this.timeoutSecs * 1000; // msec
switch (method) {
case GET:
response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
break;
case POST:
default:
response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
break;
}
// Reset url and method, to avoid repeatedly instantiating objects.
urlCheckedFunc.setUrlAndMethod(url, method);
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
Response response = retryHelper != null ? retryHelper.tryWith(urlCheckedFunc).start() : urlCheckedFunc.get();
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
String content = response.returnContent().asString();
return (T) mapper.readValue(content, reference);
}
Expand Down Expand Up @@ -495,4 +501,33 @@ public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fil
throw new HoodieRemoteException(e);
}
}

/**
* For remote HTTP requests, to avoid repeatedly instantiating objects.
*/
private class HttpRequestCheckedFunction implements RetryHelper.CheckedFunction<Response> {
private String url;
private RequestMethod method;
private final int timeoutMs;

public void setUrlAndMethod(String url, RequestMethod method) {
this.method = method;
this.url = url;
}

public HttpRequestCheckedFunction(int timeoutMs) {
this.timeoutMs = timeoutMs;
}

@Override
public Response get() throws IOException {
switch (method) {
case GET:
return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
case POST:
default:
return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,52 @@

package org.apache.hudi.common.util;

import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class RetryHelper<T> {
public class RetryHelper<T> implements Serializable {
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
private CheckedFunction<T> func;
private int num;
private long maxIntervalTime;
private long initialIntervalTime = 100L;
private transient CheckedFunction<T> func;
private final int num;
private final long maxIntervalTime;
private final long initialIntervalTime;
private String taskInfo = "N/A";
private List<? extends Class<? extends Exception>> retryExceptionsClasses;

public RetryHelper() {
}

public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
this.num = maxRetryNumbers;
this.initialIntervalTime = initialRetryIntervalMs;
this.maxIntervalTime = maxRetryIntervalMs;
if (StringUtils.isNullOrEmpty(retryExceptions)) {
this.retryExceptionsClasses = new ArrayList<>();
} else {
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
.map(Exception::getClass)
.collect(Collectors.toList());
try {
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
.map(Exception::getClass)
.collect(Collectors.toList());
} catch (HoodieException e) {
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e);
this.retryExceptionsClasses = new ArrayList<>();
}
}
}

public RetryHelper(String taskInfo) {
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) {
this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions);
this.taskInfo = taskInfo;
}

public RetryHelper tryWith(CheckedFunction<T> func) {
public RetryHelper<T> tryWith(CheckedFunction<T> func) {
this.func = func;
return this;
}
Expand All @@ -77,21 +82,26 @@ public T start() throws IOException {
throw e;
}
if (retries++ >= num) {
LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
String message = "Still failed to " + taskInfo + " after retried " + num + " times.";
LOG.error(message, e);
if (e instanceof IOException) {
throw new IOException(message, e);
}
throw e;
}
LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e);
try {
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
Thread.sleep(waitTime);
} catch (InterruptedException ex) {
// ignore InterruptedException here
// ignore InterruptedException here
}
}
}

if (retries > 0) {
LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
}

return functionResult;
}

Expand Down Expand Up @@ -123,7 +133,7 @@ private long getWaitTimeExp(int retryCount) {
}

@FunctionalInterface
public interface CheckedFunction<T> {
public interface CheckedFunction<T> extends Serializable {
T get() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
.withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
.withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
.withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
.withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
.withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
.build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
return writeClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;

/**
* Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it.
Expand Down Expand Up @@ -64,4 +66,31 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) {
view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient);
return view;
}

@Test
public void testRemoteHoodieTableFileSystemViewWithRetry() {
// Service is available.
view.getLatestBaseFiles();
// Shut down the service.
server.close();
try {
// Immediately fails and throws a connection refused exception.
view.getLatestBaseFiles();
} catch (HoodieRemoteException e) {
assert e.getMessage().contains("Connection refused (Connection refused)");
}
// Enable API request retry for remote file system view.
view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig
.newBuilder()
.withRemoteServerHost("localhost")
.withRemoteServerPort(server.getServerPort())
.withRemoteTimelineClientRetry(true)
.withRemoteTimelineClientMaxRetryNumbers(4)
.build());
try {
view.getLatestBaseFiles();
} catch (HoodieRemoteException e) {
assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times.");
}
}
}