Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use different file-based settings error message for invalid JSON and NotMasterException #116359

Merged
merged 13 commits into from
Nov 8, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,12 @@ final WatchKey enableDirectoryWatcher(WatchKey previousKey, Path settingsDir) th
void processSettingsOnServiceStartAndNotifyListeners() throws InterruptedException {
try {
processFileOnServiceStart();
for (var listener : eventListeners) {
listener.watchedFileChanged();
}
} catch (IOException | ExecutionException e) {
logger.error(() -> "Error processing watched file: " + watchedFile(), e);
onProcessFileChangesException(e);
prdoyle marked this conversation as resolved.
Show resolved Hide resolved
return;
}
for (var listener : eventListeners) {
prdoyle marked this conversation as resolved.
Show resolved Hide resolved
listener.watchedFileChanged();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.file.MasterNodeFileWatchingService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -146,11 +148,20 @@ private void processFileChanges(ReservedStateVersionCheck versionCheck) throws I

@Override
protected void onProcessFileChangesException(Exception e) {
if (e instanceof ExecutionException && e.getCause() instanceof FailedToCommitClusterStateException f) {
logger.error("Unable to commit cluster state", e);
} else {
super.onProcessFileChangesException(e);
if (e instanceof ExecutionException) {
var cause = e.getCause();
if (cause instanceof FailedToCommitClusterStateException) {
logger.error("Unable to commit cluster state", e);
return;
} else if (cause instanceof XContentParseException) {
logger.error("Unable to parse settings", e);
return;
} else if (cause instanceof NotMasterException) {
logger.error("Node is no longer master", e);
return;
}
}
super.onProcessFileChangesException(e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentParser;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;
import org.mockito.stubbing.Answer;
Expand All @@ -55,16 +58,22 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.hasEntry;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -262,6 +271,68 @@ public void testProcessFileChanges() throws Exception {
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_VERSION_ONLY), any());
}

@SuppressWarnings("unchecked")
public void testInvalidJSON() throws Exception {
doAnswer((Answer<Void>) invocation -> {
prdoyle marked this conversation as resolved.
Show resolved Hide resolved
invocation.getArgument(1, XContentParser.class).map(); // Throw if JSON is invalid
((Consumer<Exception>) invocation.getArgument(3)).accept(null);
return null;
}).when(controller).process(any(), any(XContentParser.class), any(), any());

CyclicBarrier fileChangeBarrier = new CyclicBarrier(2);
fileSettingsService.addFileChangedListener(() -> awaitOrBust(fileChangeBarrier));

Files.createDirectories(fileSettingsService.watchedFileDir());
// contents of the JSON don't matter, we just need a file to exist
writeTestFile(fileSettingsService.watchedFile(), "{}");

doAnswer((Answer<?>) invocation -> {
boolean returnedNormally = false;
try {
var result = invocation.callRealMethod();
returnedNormally = true;
return result;
} catch (XContentParseException e) {
// We're expecting a parse error. processFileChanges specifies that this is supposed to throw ExecutionException.
throw new ExecutionException(e);
} catch (Throwable e) {
throw new AssertionError("Unexpected exception", e);
} finally {
if (returnedNormally == false) {
// Because of the exception, listeners aren't notified, so we need to activate the barrier ourselves
awaitOrBust(fileChangeBarrier);
}
}
}).when(fileSettingsService).processFileChanges();

// Establish the initial valid JSON
fileSettingsService.start();
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
awaitOrBust(fileChangeBarrier);

// Now break the JSON
writeTestFile(fileSettingsService.watchedFile(), "test_invalid_JSON");
awaitOrBust(fileChangeBarrier);

verify(fileSettingsService, times(1)).processFileOnServiceStart(); // The initial state
verify(fileSettingsService, times(1)).processFileChanges(); // The changed state
verify(fileSettingsService, times(1)).onProcessFileChangesException(
argThat(e -> e instanceof ExecutionException && e.getCause() instanceof XContentParseException)
);

// Note: the name "processFileOnServiceStart" is a bit misleading because it is not
// referring to fileSettingsService.start(). Rather, it is referring to the initialization
// of the watcher thread itself, which occurs asynchronously when clusterChanged is first called.
}

private static void awaitOrBust(CyclicBarrier barrier) {
try {
barrier.await(20, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("Unexpected exception waiting for barrier", e);
}
}

@SuppressWarnings("unchecked")
public void testStopWorksInMiddleOfProcessing() throws Exception {
CountDownLatch processFileLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -356,7 +427,7 @@ private static void writeTestFile(Path path, String contents) throws IOException
Path tempFilePath = createTempFile();
Files.writeString(tempFilePath, contents);
try {
Files.move(tempFilePath, path, ATOMIC_MOVE);
Files.move(tempFilePath, path, REPLACE_EXISTING, ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
logger.info("Atomic move not available. Falling back on non-atomic move to write [{}]", path.toAbsolutePath());
Files.move(tempFilePath, path);
prdoyle marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -374,4 +445,14 @@ private static void longAwait(CountDownLatch latch) {
fail(e, "longAwait: interrupted waiting for CountDownLatch to reach zero");
}
}

public static Matcher<Object> hasCauseThat(Matcher<? super Throwable> causeMatcher) {
return new FeatureMatcher<Object, Throwable>(causeMatcher, "an exception with cause that", "cause") {
@Override
protected Throwable featureValueOf(Object obj) {
return ((Throwable) obj).getCause();
}
};
}

}
Loading