From 6cf5c2447a687003338bb5ce4797ad9ce66aaf7d Mon Sep 17 00:00:00 2001 From: HunterXHunter <1356469429@qq.com> Date: Sun, 7 Aug 2022 11:40:20 +0800 Subject: [PATCH] =?UTF-8?q?[HUDI-3669]=20Add=20a=20remote=20request=20retr?= =?UTF-8?q?y=20mechanism=20for=20'Remotehoodietablefiles=E2=80=A6=20(#5884?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adding request retry to RemoteHoodieTableFileSystemView. Users can enable using the new configs added. --- .../embedded/EmbeddedTimelineService.java | 5 ++ .../table/view/FileSystemViewManager.java | 3 +- .../view/FileSystemViewStorageConfig.java | 76 +++++++++++++++++++ .../view/RemoteHoodieTableFileSystemView.java | 67 ++++++++++++---- .../apache/hudi/common/util/RetryHelper.java | 46 ++++++----- .../org/apache/hudi/util/StreamerUtil.java | 5 ++ .../TestRemoteHoodieTableFileSystemView.java | 29 +++++++ 7 files changed, 195 insertions(+), 36 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 72f8e29c9fa8..4d5375894d7e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -117,6 +117,11 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() { .withRemoteServerHost(hostAddr) .withRemoteServerPort(serverPort) .withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs()) + .withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled()) + .withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers()) + .withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs()) + .withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs()) + .withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions()) .build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 35fda6c416ac..48023d50463d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -214,8 +214,7 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server=" + viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout=" + viewConf.getRemoteTimelineClientTimeoutSecs()); - return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(), - metaClient, viewConf.getRemoteTimelineClientTimeoutSecs()); + return new RemoteHoodieTableFileSystemView(metaClient, viewConf); } public static FileSystemViewManager createViewManager(final HoodieEngineContext context, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java index 63f10855bad8..bc835612aa93 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java @@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig { .defaultValue(5 * 60) // 5 min .withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server."); + public static final ConfigProperty REMOTE_RETRY_ENABLE = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.enable") + .defaultValue("false") + .sinceVersion("0.12.0") + .withDocumentation("Whether to enable API request retry for remote file system view."); + + public static final ConfigProperty REMOTE_MAX_RETRY_NUMBERS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.max_numbers") + .defaultValue(3) // 3 times + .sinceVersion("0.12.0") + .withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server."); + + public static final ConfigProperty REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.initial_interval_ms") + .defaultValue(100L) + .sinceVersion("0.12.0") + .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage."); + + public static final ConfigProperty REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.max_interval_ms") + .defaultValue(2000L) + .sinceVersion("0.12.0") + .withDocumentation("Maximum amount of time (in ms), to wait for next retry."); + + public static final ConfigProperty RETRY_EXCEPTIONS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.exceptions") + .defaultValue("") + .sinceVersion("0.12.0") + .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. " + + "Default is empty which means retry all the IOException and RuntimeException from Remote Request."); + public static final ConfigProperty REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty .key("hoodie.filesystem.remote.backup.view.enable") .defaultValue("true") // Need to be disabled only for tests. @@ -144,6 +175,26 @@ public Integer getRemoteTimelineClientTimeoutSecs() { return getInt(REMOTE_TIMEOUT_SECS); } + public boolean isRemoteTimelineClientRetryEnabled() { + return getBoolean(REMOTE_RETRY_ENABLE); + } + + public Integer getRemoteTimelineClientMaxRetryNumbers() { + return getInt(REMOTE_MAX_RETRY_NUMBERS); + } + + public Long getRemoteTimelineInitialRetryIntervalMs() { + return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS); + } + + public Long getRemoteTimelineClientMaxRetryIntervalMs() { + return getLong(REMOTE_MAX_RETRY_INTERVAL_MS); + } + + public String getRemoteTimelineClientRetryExceptions() { + return getString(RETRY_EXCEPTIONS); + } + public long getMaxMemoryForFileGroupMap() { long totalMemory = getLong(SPILLABLE_MEMORY); return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile(); @@ -245,6 +296,31 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout return this; } + public Builder withRemoteTimelineClientRetry(boolean enableRetry) { + fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry)); + return this; + } + + public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) { + fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString()); + return this; + } + + public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) { + fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString()); + return this; + } + + public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) { + fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString()); + return this; + } + + public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) { + fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions); + return this; + } + public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) { fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString()); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 099b79cbba0a..ea51732eb020 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO; import org.apache.hudi.common.table.timeline.dto.TimelineDTO; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.RetryHelper; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -132,22 +133,35 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, private boolean closed = false; + private RetryHelper retryHelper; + + private final HttpRequestCheckedFunction urlCheckedFunc; + private enum RequestMethod { GET, POST } public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) { - this(server, port, metaClient, 300); + this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build()); } - public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) { + public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) { this.basePath = metaClient.getBasePath(); - this.serverHost = server; - this.serverPort = port; this.mapper = new ObjectMapper(); this.metaClient = metaClient; this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - this.timeoutSecs = timeoutSecs; + this.serverHost = viewConf.getRemoteViewServerHost(); + this.serverPort = viewConf.getRemoteViewServerPort(); + this.timeoutSecs = viewConf.getRemoteTimelineClientTimeoutSecs(); + this.urlCheckedFunc = new HttpRequestCheckedFunction(this.timeoutSecs * 1000); + if (viewConf.isRemoteTimelineClientRetryEnabled()) { + retryHelper = new RetryHelper( + viewConf.getRemoteTimelineClientMaxRetryIntervalMs(), + viewConf.getRemoteTimelineClientMaxRetryNumbers(), + viewConf.getRemoteTimelineInitialRetryIntervalMs(), + viewConf.getRemoteTimelineClientRetryExceptions(), + "Sending request"); + } } private T executeRequest(String requestPath, Map queryParameters, TypeReference reference, @@ -165,17 +179,9 @@ private T executeRequest(String requestPath, Map queryParame String url = builder.toString(); LOG.info("Sending request : (" + url + ")"); - Response response; - int timeout = this.timeoutSecs * 1000; // msec - switch (method) { - case GET: - response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute(); - break; - case POST: - default: - response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute(); - break; - } + // Reset url and method, to avoid repeatedly instantiating objects. + urlCheckedFunc.setUrlAndMethod(url, method); + Response response = retryHelper != null ? retryHelper.tryWith(urlCheckedFunc).start() : urlCheckedFunc.get(); String content = response.returnContent().asString(); return (T) mapper.readValue(content, reference); } @@ -495,4 +501,33 @@ public Option getLatestBaseFile(String partitionPath, String fil throw new HoodieRemoteException(e); } } + + /** + * For remote HTTP requests, to avoid repeatedly instantiating objects. + */ + private class HttpRequestCheckedFunction implements RetryHelper.CheckedFunction { + private String url; + private RequestMethod method; + private final int timeoutMs; + + public void setUrlAndMethod(String url, RequestMethod method) { + this.method = method; + this.url = url; + } + + public HttpRequestCheckedFunction(int timeoutMs) { + this.timeoutMs = timeoutMs; + } + + @Override + public Response get() throws IOException { + switch (method) { + case GET: + return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); + case POST: + default: + return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); + } + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java index 067c5ee40dad..efa10f38302b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java @@ -18,28 +18,27 @@ package org.apache.hudi.common.util; +import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.stream.Collectors; -public class RetryHelper { +public class RetryHelper implements Serializable { private static final Logger LOG = LogManager.getLogger(RetryHelper.class); - private CheckedFunction func; - private int num; - private long maxIntervalTime; - private long initialIntervalTime = 100L; + private transient CheckedFunction func; + private final int num; + private final long maxIntervalTime; + private final long initialIntervalTime; private String taskInfo = "N/A"; private List> retryExceptionsClasses; - public RetryHelper() { - } - public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) { this.num = maxRetryNumbers; this.initialIntervalTime = initialRetryIntervalMs; @@ -47,18 +46,24 @@ public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRet if (StringUtils.isNullOrEmpty(retryExceptions)) { this.retryExceptionsClasses = new ArrayList<>(); } else { - this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(",")) - .map(exception -> (Exception) ReflectionUtils.loadClass(exception, "")) - .map(Exception::getClass) - .collect(Collectors.toList()); + try { + this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(",")) + .map(exception -> (Exception) ReflectionUtils.loadClass(exception, "")) + .map(Exception::getClass) + .collect(Collectors.toList()); + } catch (HoodieException e) { + LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e); + this.retryExceptionsClasses = new ArrayList<>(); + } } } - public RetryHelper(String taskInfo) { + public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) { + this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions); this.taskInfo = taskInfo; } - public RetryHelper tryWith(CheckedFunction func) { + public RetryHelper tryWith(CheckedFunction func) { this.func = func; return this; } @@ -77,14 +82,18 @@ public T start() throws IOException { throw e; } if (retries++ >= num) { - LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e); + String message = "Still failed to " + taskInfo + " after retried " + num + " times."; + LOG.error(message, e); + if (e instanceof IOException) { + throw new IOException(message, e); + } throw e; } - LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e); + LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e); try { Thread.sleep(waitTime); } catch (InterruptedException ex) { - // ignore InterruptedException here + // ignore InterruptedException here } } } @@ -92,6 +101,7 @@ public T start() throws IOException { if (retries > 0) { LOG.info("Success to " + taskInfo + " after retried " + retries + " times."); } + return functionResult; } @@ -123,7 +133,7 @@ private long getWaitTimeExp(int retryCount) { } @FunctionalInterface - public interface CheckedFunction { + public interface CheckedFunction extends Serializable { T get() throws IOException; } } \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7c7cdcc8adb3..c14749c50da2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -430,6 +430,11 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()) .withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs()) + .withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled()) + .withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers()) + .withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs()) + .withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs()) + .withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions()) .build(); ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf); return writeClient; diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java index f9a6172b5ec3..55fd9d7f1608 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java @@ -28,12 +28,14 @@ import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView; +import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.jupiter.api.Test; /** * Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it. @@ -64,4 +66,31 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient); return view; } + + @Test + public void testRemoteHoodieTableFileSystemViewWithRetry() { + // Service is available. + view.getLatestBaseFiles(); + // Shut down the service. + server.close(); + try { + // Immediately fails and throws a connection refused exception. + view.getLatestBaseFiles(); + } catch (HoodieRemoteException e) { + assert e.getMessage().contains("Connection refused (Connection refused)"); + } + // Enable API request retry for remote file system view. + view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig + .newBuilder() + .withRemoteServerHost("localhost") + .withRemoteServerPort(server.getServerPort()) + .withRemoteTimelineClientRetry(true) + .withRemoteTimelineClientMaxRetryNumbers(4) + .build()); + try { + view.getLatestBaseFiles(); + } catch (HoodieRemoteException e) { + assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times."); + } + } }