Skip to content

Commit

Permalink
Use different file-based settings error message for invalid JSON and …
Browse files Browse the repository at this point in the history
…NotMasterException (#116359)

* Fixup: remove unused pattern variable from before

* Try1 handle XContentParseException

* Mocks wrap XContentParseException in ExecutionException like the real code does

* onProcessFileChangesException case for XContentParseException

* Handle NotMasterException while we're at it.

* Cleanup

* Use Nikolaj's addFileChangedListener approach to test

* Add REPLACE_EXISTING

* Remove ATOMIC_MOVE

Co-authored-by: Nikolaj Volgushev <n1v0lg@users.noreply.github.com>

* Delete stray generated files

* Remove unused method

---------

Co-authored-by: Nikolaj Volgushev <n1v0lg@users.noreply.github.com>
  • Loading branch information
2 people authored and jozala committed Nov 13, 2024
1 parent afcb513 commit da41a52
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,12 @@ final WatchKey enableDirectoryWatcher(WatchKey previousKey, Path settingsDir) th
void processSettingsOnServiceStartAndNotifyListeners() throws InterruptedException {
try {
processFileOnServiceStart();
for (var listener : eventListeners) {
listener.watchedFileChanged();
}
} catch (IOException | ExecutionException e) {
logger.error(() -> "Error processing watched file: " + watchedFile(), e);
onProcessFileChangesException(e);
return;
}
for (var listener : eventListeners) {
listener.watchedFileChanged();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.file.MasterNodeFileWatchingService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -146,11 +148,20 @@ private void processFileChanges(ReservedStateVersionCheck versionCheck) throws I

@Override
protected void onProcessFileChangesException(Exception e) {
if (e instanceof ExecutionException && e.getCause() instanceof FailedToCommitClusterStateException f) {
logger.error("Unable to commit cluster state", e);
} else {
super.onProcessFileChangesException(e);
if (e instanceof ExecutionException) {
var cause = e.getCause();
if (cause instanceof FailedToCommitClusterStateException) {
logger.error("Unable to commit cluster state", e);
return;
} else if (cause instanceof XContentParseException) {
logger.error("Unable to parse settings", e);
return;
} else if (cause instanceof NotMasterException) {
logger.error("Node is no longer master", e);
return;
}
}
super.onProcessFileChangesException(e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentParser;
import org.junit.After;
import org.junit.Before;
Expand All @@ -55,16 +56,22 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.hasEntry;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -262,6 +269,68 @@ public void testProcessFileChanges() throws Exception {
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_VERSION_ONLY), any());
}

@SuppressWarnings("unchecked")
public void testInvalidJSON() throws Exception {
doAnswer((Answer<Void>) invocation -> {
invocation.getArgument(1, XContentParser.class).map(); // Throw if JSON is invalid
((Consumer<Exception>) invocation.getArgument(3)).accept(null);
return null;
}).when(controller).process(any(), any(XContentParser.class), any(), any());

CyclicBarrier fileChangeBarrier = new CyclicBarrier(2);
fileSettingsService.addFileChangedListener(() -> awaitOrBust(fileChangeBarrier));

Files.createDirectories(fileSettingsService.watchedFileDir());
// contents of the JSON don't matter, we just need a file to exist
writeTestFile(fileSettingsService.watchedFile(), "{}");

doAnswer((Answer<?>) invocation -> {
boolean returnedNormally = false;
try {
var result = invocation.callRealMethod();
returnedNormally = true;
return result;
} catch (XContentParseException e) {
// We're expecting a parse error. processFileChanges specifies that this is supposed to throw ExecutionException.
throw new ExecutionException(e);
} catch (Throwable e) {
throw new AssertionError("Unexpected exception", e);
} finally {
if (returnedNormally == false) {
// Because of the exception, listeners aren't notified, so we need to activate the barrier ourselves
awaitOrBust(fileChangeBarrier);
}
}
}).when(fileSettingsService).processFileChanges();

// Establish the initial valid JSON
fileSettingsService.start();
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
awaitOrBust(fileChangeBarrier);

// Now break the JSON
writeTestFile(fileSettingsService.watchedFile(), "test_invalid_JSON");
awaitOrBust(fileChangeBarrier);

verify(fileSettingsService, times(1)).processFileOnServiceStart(); // The initial state
verify(fileSettingsService, times(1)).processFileChanges(); // The changed state
verify(fileSettingsService, times(1)).onProcessFileChangesException(
argThat(e -> e instanceof ExecutionException && e.getCause() instanceof XContentParseException)
);

// Note: the name "processFileOnServiceStart" is a bit misleading because it is not
// referring to fileSettingsService.start(). Rather, it is referring to the initialization
// of the watcher thread itself, which occurs asynchronously when clusterChanged is first called.
}

private static void awaitOrBust(CyclicBarrier barrier) {
try {
barrier.await(20, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("Unexpected exception waiting for barrier", e);
}
}

@SuppressWarnings("unchecked")
public void testStopWorksInMiddleOfProcessing() throws Exception {
CountDownLatch processFileLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -356,10 +425,10 @@ private static void writeTestFile(Path path, String contents) throws IOException
Path tempFilePath = createTempFile();
Files.writeString(tempFilePath, contents);
try {
Files.move(tempFilePath, path, ATOMIC_MOVE);
Files.move(tempFilePath, path, REPLACE_EXISTING, ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
logger.info("Atomic move not available. Falling back on non-atomic move to write [{}]", path.toAbsolutePath());
Files.move(tempFilePath, path);
Files.move(tempFilePath, path, REPLACE_EXISTING);
}
}

Expand All @@ -374,4 +443,5 @@ private static void longAwait(CountDownLatch latch) {
fail(e, "longAwait: interrupted waiting for CountDownLatch to reach zero");
}
}

}

0 comments on commit da41a52

Please sign in to comment.