Skip to content

Commit

Permalink
HIVE-28051: LLAP: cleanup local folders on startup and periodically (#…
Browse files Browse the repository at this point in the history
…5054) (Laszlo Bodor reviewed by Butao Zhang)
  • Loading branch information
abstractdog authored Mar 8, 2024
1 parent bee33d2 commit 03a76ac
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 15 deletions.
11 changes: 11 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5508,6 +5508,17 @@ public static enum ConfVars {
LLAP_TASK_TIME_SUMMARY(
"hive.llap.task.time.print.summary", false,
"Display queue and runtime of tasks by host for every query executed by the shell."),

LLAP_LOCAL_DIR_CLEANER_CLEANUP_INTERVAL(
"hive.llap.local.dir.cleaner.cleanup.interval", "2h", new TimeValidator(TimeUnit.HOURS),
"Interval by which the LocalDirCleaner service in LLAP daemon checks for stale/old files." +
"Under normal circumstances, local files are cleaned up properly, so it's not recommended to" +
"set this more frequent than a couple of hours. Default is 2 hours."),
LLAP_LOCAL_DIR_CLEANER_FILE_MODIFY_TIME_THRESHOLD("hive.llap.local.dir.cleaner.file.modify.time.threshold", "24h",
new TimeValidator(TimeUnit.HOURS),
"Threshold time for LocalDirCleaner: if a regular file's modify time is older than this value, the file gets deleted." +
"Defaults to 86400s (1 day), which is a reasonable period for a local file to be considered as a stale one."),

HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",
new TimeValidator(TimeUnit.MILLISECONDS),
"Interval for validating triggers during execution of a query. Triggers defined in resource plan will get\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
// AMReporter after the server so that it gets the correct address. It knows how to deal with
// requests before it is started.
addIfService(amReporter);
addIfService(new LocalDirCleaner(localDirs, daemonConf));
}

private static long determineXmxHeadroom(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap.daemon.impl;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* LocalDirCleaner is an LLAP daemon service to clean up old local files. Under normal circumstances,
* intermediate/local files are cleaned up (typically at end of the DAG), but daemons crash sometimes,
* and the attached local disk might end up being the same when a new daemon starts (this applies to
* on-prem as well as cloud scenarios).
*/
public class LocalDirCleaner extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(LocalDirCleaner.class);

private List<String> localDirs;

private long cleanupIntervalSec;
private long fileModifyTimeThresholdSec;

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

public LocalDirCleaner(String[] localDirs, Configuration conf) {
super("LocalDirCleaner");
this.localDirs = Arrays.asList(localDirs);
this.cleanupIntervalSec = getInterval(conf);
this.fileModifyTimeThresholdSec = getFileModifyTimeThreshold(conf);
LOG.info("Initialized local dir cleaner: interval: {}s, threshold: {}s", cleanupIntervalSec,
fileModifyTimeThresholdSec);
}

@Override
public void serviceStart() throws IOException {
scheduler.scheduleAtFixedRate(this::cleanup, 0, cleanupIntervalSec, TimeUnit.SECONDS);
}

@Override
public void serviceStop() throws IOException {
// we can shutdown this service now and ignore leftovers, because under normal circumstances,
// files from the local dirs are cleaned up (so LocalDirCleaner is a best effort utility)
scheduler.shutdownNow();
}

private long getFileModifyTimeThreshold(Configuration conf) {
return HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_LOCAL_DIR_CLEANER_FILE_MODIFY_TIME_THRESHOLD,
TimeUnit.SECONDS);
}

private long getInterval(Configuration conf) {
return HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_LOCAL_DIR_CLEANER_CLEANUP_INTERVAL, TimeUnit.SECONDS);
}

private void cleanup() {
Instant deleteBefore = Instant.now().minus(fileModifyTimeThresholdSec, ChronoUnit.SECONDS);

localDirs.forEach(localDir -> cleanupPath(deleteBefore, Paths.get(localDir)));
}

private void cleanupPath(Instant deleteBefore, Path pathLocalDir) {
LOG.info("Cleaning up files older than {} from {}", deleteBefore, pathLocalDir);

try (Stream<Path> files = Files.walk(pathLocalDir)) {
files.filter(f -> {
try {
FileTime modified = Files.getLastModifiedTime(f);
LOG.debug("Checking: {}, modified: {}", f, modified);
return Files.isRegularFile(f) && modified.toInstant().isBefore(deleteBefore);
} catch (IOException ex) {
LOG.warn("IOException caught while checking file for deletion", ex);
return false;
}
}).forEach(f -> {
try {
LOG.info("Delete old local file: {}", f);
Files.delete(f);
} catch (IOException ex) {
LOG.warn("Failed to delete file", ex);
}
});
} catch (IOException e) {
LOG.warn("IOException caught while walking over local files", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@
import org.mockito.internal.util.reflection.Fields;
import org.mockito.internal.util.reflection.InstanceField;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -55,6 +60,11 @@ public class TestLlapDaemon {
MetricsUtils.METRICS_PROCESS_NAME
};

public static final String TEST_LOCAL_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestLlapDaemon.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");

private Configuration hiveConf = new HiveConf();

@Mock
Expand All @@ -64,17 +74,19 @@ public class TestLlapDaemon {
private ArgumentCaptor<Iterable<Map.Entry<String, String>>> captor;

private LlapDaemon daemon;
private String[] localDirs = new String[] {TEST_LOCAL_DIR};
private int defaultWebPort = HiveConf.ConfVars.LLAP_DAEMON_WEB_PORT.defaultIntVal;

@Before
public void setUp() {
initMocks(this);
HiveConf.setVar(hiveConf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "@llap");
HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "localhost");

String[] localDirs = new String[1];
setupConf(hiveConf);
LlapDaemonInfo.initialize("testDaemon", hiveConf);
daemon = new LlapDaemon(hiveConf, 1, LlapDaemon.getTotalHeapSize(), false, false,
-1, localDirs, 0, false, 0,0, 0, -1, "TestLlapDaemon");
}

private void setupConf(Configuration conf) {
HiveConf.setVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "localhost");
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST, true);
}

@After
Expand All @@ -83,26 +95,67 @@ public void tearDown() {
for (String mSource : METRICS_SOURCES) {
ms.unregisterSource(mSource);
}
daemon.shutdown();
if (daemon != null) {
daemon.shutdown();
}
}

@Test(expected = IllegalArgumentException.class)
public void testEnforceProperNumberOfIOThreads() throws IOException {
Configuration thisHiveConf = new HiveConf();
HiveConf.setVar(thisHiveConf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "@llap");
HiveConf.setIntVar(thisHiveConf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS, 4);
HiveConf.setIntVar(thisHiveConf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE, 3);

LlapDaemon thisDaemon = new LlapDaemon(thisHiveConf, 1, LlapDaemon.getTotalHeapSize(), false, false,
-1, new String[1], 0, false, 0,0, 0, -1, "TestLlapDaemon");
thisDaemon.close();
HiveConf.setIntVar(hiveConf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE, 3);

daemon = new LlapDaemon(hiveConf, 4, LlapDaemon.getTotalHeapSize(), true, false,
-1, new String[1], 0, false, 0,0, 0, defaultWebPort, "TestLlapDaemon");
}

@Test
public void testLocalDirCleaner() throws IOException, InterruptedException {
HiveConf.setTimeVar(hiveConf, HiveConf.ConfVars.LLAP_LOCAL_DIR_CLEANER_CLEANUP_INTERVAL, 2, TimeUnit.SECONDS);
HiveConf.setTimeVar(hiveConf, HiveConf.ConfVars.LLAP_LOCAL_DIR_CLEANER_FILE_MODIFY_TIME_THRESHOLD, 1,
TimeUnit.SECONDS);

createFile(localDirs[0] + "/hive/appcache/file1");
createFile(localDirs[0] + "/hive/appcache/file2");
createFile(localDirs[0] + "/file3");

daemon = new LlapDaemon(hiveConf, 1, LlapDaemon.getTotalHeapSize(), false, false,
-1, localDirs, 0, false, 0,0, 0, defaultWebPort, "TestLlapDaemon");
daemon.init(hiveConf);

assertFileExists(localDirs[0] + "/hive/appcache/file1", true);
assertFileExists(localDirs[0] + "/hive/appcache/file2", true);
assertFileExists(localDirs[0] + "/file3", true);

daemon.start();
Thread.sleep(5000);

assertFileExists(localDirs[0] + "/hive/appcache/file1", false);
assertFileExists(localDirs[0] + "/hive/appcache/file2", false);
assertFileExists(localDirs[0] + "/file3", false);

// folder is preserved
assertFileExists(localDirs[0] + "/hive/appcache", true);
}

private void assertFileExists(String strPath, boolean exists) {
assertEquals(strPath + " " + (exists ? "doesn't exist" : "exists"), exists, Files.exists(Paths.get(strPath)));
}

private void createFile(String strPath) throws IOException {
Path path = Paths.get(strPath);
Files.createDirectories(path.getParent());
Files.createFile(path);
}

@Test
public void testUpdateRegistration() throws IOException {
// Given
int enabledExecutors = 0;
int enabledQueue = 2;

daemon = new LlapDaemon(hiveConf, 1, LlapDaemon.getTotalHeapSize(), false, false,
-1, new String[1], 0, false, 0,0, 0, defaultWebPort, "TestLlapDaemon");

trySetMock(daemon, LlapRegistryService.class, mockRegistry);

// When
Expand Down

0 comments on commit 03a76ac

Please sign in to comment.