Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport "HBASE-28931: Poll SSL cert file for changes (#6381)" to branch-2 #6536

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,41 @@
package org.apache.hadoop.hbase.io;

import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.function.Consumer;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Instances of this class can be used to watch a directory for file changes. When a file is added
* to, deleted from, or is modified in the given directory, the callback provided by the user will
* be called from a background thread. Some things to keep in mind:
* Instances of this class can be used to watch a file for changes. When a file's modification time
* changes, the callback provided by the user will be called from a background thread. Modification
* are detected by checking the file's attributes every polling interval. Some things to keep in
* mind:
* <ul>
* <li>The callback should be thread-safe.</li>
* <li>Changes that happen around the time the thread is started may be missed.</li>
* <li>There is a delay between a file changing and the callback firing.</li>
* <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
* </ul>
* <p/>
* This file has been copied from the Apache ZooKeeper project.
* This file was originally copied from the Apache ZooKeeper project, and then modified.
* @see <a href=
* "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base
* revision</a>
*/
@InterfaceAudience.Private
public final class FileChangeWatcher {

public interface FileChangeWatcherCallback {
void callback(Path path);
}

private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);

public enum State {
enum State {
NEW, // object created but start() not called yet
STARTING, // start() called but background thread has not entered main loop
RUNNING, // background thread is running
Expand All @@ -61,37 +62,37 @@ public enum State {

private final WatcherThread watcherThread;
private State state; // protected by synchronized(this)
private FileTime lastModifiedTime;
private final Object lastModifiedTimeLock;
private final Path filePath;
private final Duration pollInterval;

/**
* Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on
* Creates a watcher that watches <code>filePath</code> and invokes <code>callback</code> on
* changes.
* @param dirPath the directory to watch.
* @param filePath the file to watch.
* @param callback the callback to invoke with events. <code>event.kind()</code> will return the
* type of event, and <code>event.context()</code> will return the filename
* relative to <code>dirPath</code>.
* @throws IOException if there is an error creating the WatchService.
*/
public FileChangeWatcher(Path dirPath, String threadNameSuffix, Consumer<WatchEvent<?>> callback)
throws IOException {
FileSystem fs = dirPath.getFileSystem();
WatchService watchService = fs.newWatchService();

LOG.debug("Registering with watch service: {}", dirPath);
public FileChangeWatcher(Path filePath, String threadNameSuffix, Duration pollInterval,
FileChangeWatcherCallback callback) throws IOException {
this.filePath = filePath;
this.pollInterval = pollInterval;

dirPath.register(watchService,
new WatchEvent.Kind<?>[] { StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.OVERFLOW });
state = State.NEW;
this.watcherThread = new WatcherThread(threadNameSuffix, watchService, callback);
lastModifiedTimeLock = new Object();
lastModifiedTime = Files.readAttributes(filePath, BasicFileAttributes.class).lastModifiedTime();
this.watcherThread = new WatcherThread(threadNameSuffix, callback);
this.watcherThread.setDaemon(true);
}

/**
* Returns the current {@link FileChangeWatcher.State}.
* @return the current state.
*/
public synchronized State getState() {
private synchronized State getState() {
return state;
}

Expand Down Expand Up @@ -187,21 +188,18 @@ private class WatcherThread extends Thread {

private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";

final WatchService watchService;
final Consumer<WatchEvent<?>> callback;
final FileChangeWatcherCallback callback;

WatcherThread(String threadNameSuffix, WatchService watchService,
Consumer<WatchEvent<?>> callback) {
WatcherThread(String threadNameSuffix, FileChangeWatcherCallback callback) {
super(THREAD_NAME_PREFIX + threadNameSuffix);
this.watchService = watchService;
this.callback = callback;
setUncaughtExceptionHandler(FileChangeWatcher::handleException);
}

@Override
public void run() {
try {
LOG.info("{} thread started", getName());
LOG.debug("{} thread started", getName());
if (
!compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING)
) {
Expand All @@ -216,44 +214,40 @@ public void run() {
runLoop();
} catch (Exception e) {
LOG.warn("Error in runLoop()", e);
throw e;
throw new RuntimeException(e);
} finally {
try {
watchService.close();
} catch (IOException e) {
LOG.warn("Error closing watch service", e);
}
LOG.info("{} thread finished", getName());
LOG.debug("{} thread finished", getName());
FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED);
}
}

private void runLoop() {
private void runLoop() throws IOException {
while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) {
WatchKey key;
try {
key = watchService.take();
} catch (InterruptedException | ClosedWatchServiceException e) {
LOG.debug("{} was interrupted and is shutting down...", getName());
break;
BasicFileAttributes attributes = Files.readAttributes(filePath, BasicFileAttributes.class);
boolean modified = false;
synchronized (lastModifiedTimeLock) {
FileTime maybeNewLastModifiedTime = attributes.lastModifiedTime();
if (!lastModifiedTime.equals(maybeNewLastModifiedTime)) {
modified = true;
lastModifiedTime = maybeNewLastModifiedTime;
}
}
for (WatchEvent<?> event : key.pollEvents()) {
LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context());

// avoid calling callback while holding lock
if (modified) {
try {
callback.accept(event);
callback.callback(filePath);
} catch (Throwable e) {
LOG.error("Error from callback", e);
}
}
boolean isKeyValid = key.reset();
if (!isKeyValid) {
// This is likely a problem, it means that file reloading is broken, probably because the
// directory we are watching was deleted or otherwise became inaccessible (unmounted,
// permissions
// changed, ???).
// For now, we log an error and exit the watcher thread.
LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
break;

try {
Thread.sleep(pollInterval.toMillis());
} catch (InterruptedException e) {
LOG.debug("Interrupted", e);
Thread.currentThread().interrupt();
return;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.PKIXBuilderParameters;
import java.security.cert.X509CertSelector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -155,6 +154,8 @@ private static String[] getCBCCiphers() {

private static final String[] DEFAULT_CIPHERS_OPENSSL = getOpenSslFilteredDefaultCiphers();

private static final Duration FILE_POLL_INTERVAL = Duration.ofMinutes(1);

/**
* Not all of our default ciphers are available in OpenSSL. Takes our default cipher lists and
* filters them to only those available in OpenSsl. Prefers TLS 1.3, then GCM, then CBC because
Expand Down Expand Up @@ -510,50 +511,19 @@ private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runna
return null;
}
final Path filePath = Paths.get(fileLocation).toAbsolutePath();
Path parentPath = filePath.getParent();
if (parentPath == null) {
throw new IOException("Key/trust store path does not have a parent: " + filePath);
}
FileChangeWatcher fileChangeWatcher =
new FileChangeWatcher(parentPath, Objects.toString(filePath.getFileName()), watchEvent -> {
handleWatchEvent(filePath, watchEvent, resetContext);
});
new FileChangeWatcher(filePath, Objects.toString(filePath.getFileName()), FILE_POLL_INTERVAL,
watchEventFilePath -> handleWatchEvent(watchEventFilePath, resetContext));
fileChangeWatcher.start();
return fileChangeWatcher;
}

/**
* Handler for watch events that let us know a file we may care about has changed on disk.
* @param filePath the path to the file we are watching for changes.
* @param event the WatchEvent.
*/
private static void handleWatchEvent(Path filePath, WatchEvent<?> event, Runnable resetContext) {
boolean shouldResetContext = false;
Path dirPath = filePath.getParent();
if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) {
// If we get notified about possibly missed events, reload the key store / trust store just to
// be sure.
shouldResetContext = true;
} else if (
event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY)
|| event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)
) {
Path eventFilePath = dirPath.resolve((Path) event.context());
if (filePath.equals(eventFilePath)) {
shouldResetContext = true;
}
}
// Note: we don't care about delete events
if (shouldResetContext) {
LOG.info(
"Attempting to reset default SSL context after receiving watch event: {} with context: {}",
event.kind(), event.context());
resetContext.run();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring watch event and keeping previous default SSL context. "
+ "Event kind: {} with context: {}", event.kind(), event.context());
}
}
private static void handleWatchEvent(Path filePath, Runnable resetContext) {
LOG.info("Attempting to reset default SSL context after receiving watch event on file {}",
filePath);
resetContext.run();
}
}
Loading