Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reprocess operator file settings on service start #114295

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
0c193cd
Reprocess file settings on start
n1v0lg Oct 8, 2024
1cc6175
Clean up and fixes
n1v0lg Oct 10, 2024
4e243c4
Merge branch 'main' into reprocess-file-settings-on-start
n1v0lg Oct 10, 2024
4fab8f6
Merge branch 'main' into reprocess-file-settings-on-start
elasticmachine Oct 11, 2024
80036ee
Clean up
n1v0lg Oct 11, 2024
7242520
IT
n1v0lg Oct 11, 2024
2fd6eef
Smaller diff
n1v0lg Oct 11, 2024
4319bb6
Nit
n1v0lg Oct 11, 2024
574ba67
More clean up
n1v0lg Oct 11, 2024
fd4d20c
Missed a file
n1v0lg Oct 11, 2024
75a255d
More clean and tests
n1v0lg Oct 11, 2024
55f5399
WIP better conditions
n1v0lg Oct 11, 2024
d790a5e
Test fixes
n1v0lg Oct 11, 2024
5f41735
More unit tests
n1v0lg Oct 11, 2024
28938d5
WIP error handling
n1v0lg Oct 11, 2024
d6d9c43
Fix concurrency in tests
n1v0lg Oct 11, 2024
fa65684
More
n1v0lg Oct 11, 2024
77421c7
New error
n1v0lg Oct 11, 2024
16db4ed
Merge branch 'main' into reprocess-file-settings-on-start
elasticmachine Oct 14, 2024
1fae6be
Version fix
n1v0lg Oct 14, 2024
8d3539b
Merge branch 'reprocess-file-settings-on-start' of github.com:n1v0lg/…
n1v0lg Oct 14, 2024
7a02f9b
Update docs/changelog/114295.yaml
n1v0lg Oct 14, 2024
ff796cf
Changelog
n1v0lg Oct 14, 2024
e478134
Merge branch 'main' into reprocess-file-settings-on-start
n1v0lg Oct 14, 2024
74ab91a
Merge branch 'main' into reprocess-file-settings-on-start
n1v0lg Oct 14, 2024
9a44ebf
Upgrade test
n1v0lg Oct 14, 2024
89b0957
Spacing
n1v0lg Oct 14, 2024
dda0723
WIP review feedback
n1v0lg Oct 15, 2024
8dc3709
Fix test
n1v0lg Oct 15, 2024
496ab5c
Merge branch 'main' into reprocess-file-settings-on-start
n1v0lg Oct 15, 2024
ae00eaa
Fold reprocessSameVersion into version metadata
n1v0lg Oct 15, 2024
99e2b0e
Refactor
n1v0lg Oct 16, 2024
b7608cb
Parens
n1v0lg Oct 16, 2024
6ba1336
Merge branch 'main' into reprocess-file-settings-on-start
n1v0lg Oct 16, 2024
035e8c0
Fix compilation
n1v0lg Oct 16, 2024
7e2720d
Error version checks
n1v0lg Oct 16, 2024
051f3c6
Tweak
n1v0lg Oct 16, 2024
7af83cb
More clean up
n1v0lg Oct 16, 2024
e1a1de2
More
n1v0lg Oct 16, 2024
c0540bf
Inject version check
n1v0lg Oct 18, 2024
cf8002a
Smaller diff
n1v0lg Oct 18, 2024
a5d1bc8
More
n1v0lg Oct 18, 2024
d20b1d3
Rename
n1v0lg Oct 18, 2024
1409c89
Merge branch 'main' into reprocess-file-settings-on-start
n1v0lg Oct 18, 2024
fbf0691
Fix
n1v0lg Oct 18, 2024
afef613
Javadoc clean up
n1v0lg Oct 18, 2024
cb84cfb
Naming is hard
n1v0lg Oct 18, 2024
997ac7a
Log message
n1v0lg Oct 18, 2024
a09b505
Match description with actual thing
n1v0lg Oct 18, 2024
8548c93
Still more fixes
n1v0lg Oct 18, 2024
4205feb
Urgh imports
n1v0lg Oct 18, 2024
65d05b4
Fix test
n1v0lg Oct 18, 2024
dbbf948
Merge branch 'main' into reprocess-file-settings-on-start
n1v0lg Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/114295.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114295
summary: "Reprocess operator file settings when settings service starts, due to node restart or master node change"
area: Infra/Settings
type: enhancement
issues: [ ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.upgrades;

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.client.Request;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class FileSettingsRoleMappingUpgradeIT extends ParameterizedRollingUpgradeTestCase {
Copy link
Contributor Author

@n1v0lg n1v0lg Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't actually run on 9.0, only once these changes are backported to 8.x -- I have a PR open to give this some CI time (also checked that it passes locally).


private static final String settingsJSON = """
{
"metadata": {
"version": "1",
"compatibility": "8.4.0"
},
"state": {
"role_mappings": {
"everyone_kibana": {
"enabled": true,
"roles": [ "kibana_user" ],
"rules": { "field": { "username": "*" } }
}
}
}
}""";

private static final TemporaryFolder repoDirectory = new TemporaryFolder();

private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.version(getOldClusterTestVersion())
.nodes(NODE_NUM)
.setting("path.repo", new Supplier<>() {
@Override
@SuppressForbidden(reason = "TemporaryFolder only has io.File methods, not nio.File")
public String get() {
return repoDirectory.getRoot().getPath();
}
})
.setting("xpack.security.enabled", "true")
// workaround to avoid having to set up clients and authorization headers
.setting("xpack.security.authc.anonymous.roles", "superuser")
.configFile("operator/settings.json", Resource.fromString(settingsJSON))
.build();

@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(repoDirectory).around(cluster);

public FileSettingsRoleMappingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
super(upgradedNodes);
}

@Override
protected ElasticsearchCluster getUpgradeCluster() {
return cluster;
}

@Before
public void checkVersions() {
assumeTrue(
"Only relevant when upgrading from a version before role mappings were stored in cluster state",
oldClusterHasFeature("gte_v8.4.0") && oldClusterHasFeature("gte_v8.15.0") == false
);
}

public void testRoleMappingsAppliedOnUpgrade() throws IOException {
if (isOldCluster()) {
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
List<Object> roleMappings = new XContentTestUtils.JsonMapView(entityAsMap(client().performRequest(clusterStateRequest))).get(
"metadata.role_mappings.role_mappings"
);
assertThat(roleMappings, is(nullValue()));
} else if (isUpgradedCluster()) {
// the nodes have all been upgraded. Check they re-processed the role mappings in the settings file on
// upgrade
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
List<Object> roleMappings = new XContentTestUtils.JsonMapView(entityAsMap(client().performRequest(clusterStateRequest))).get(
"metadata.role_mappings.role_mappings"
);
assertThat(roleMappings, is(not(nullValue())));
assertThat(roleMappings.size(), equalTo(1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand All @@ -40,6 +41,7 @@
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
import static org.elasticsearch.test.NodeRoles.masterNode;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -50,7 +52,12 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class FileSettingsServiceIT extends ESIntegTestCase {

private static final AtomicLong versionCounter = new AtomicLong(1);
private final AtomicLong versionCounter = new AtomicLong(1);

@Before
public void resetVersionCounter() {
versionCounter.set(1);
n1v0lg marked this conversation as resolved.
Show resolved Hide resolved
}

private static final String testJSON = """
{
Expand Down Expand Up @@ -102,15 +109,29 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
}
}""";

private static final String testOtherErrorJSON = """
{
"metadata": {
"version": "%s",
"compatibility": "8.4.0"
},
"state": {
"bad_cluster_settings": {
"search.allow_expensive_queries": "false"
}
}
}""";

private void assertMasterNode(Client client, String node) {
assertThat(
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(),
equalTo(node)
);
}

public static void writeJSONFile(String node, String json, AtomicLong versionCounter, Logger logger) throws Exception {
long version = versionCounter.incrementAndGet();
public static void writeJSONFile(String node, String json, AtomicLong versionCounter, Logger logger, boolean incrementVersion)
throws Exception {
long version = incrementVersion ? versionCounter.incrementAndGet() : versionCounter.get();

FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);

Expand All @@ -124,6 +145,15 @@ public static void writeJSONFile(String node, String json, AtomicLong versionCou
logger.info("--> After writing new settings file: [{}]", settingsFileContent);
}

public static void writeJSONFile(String node, String json, AtomicLong versionCounter, Logger logger) throws Exception {
writeJSONFile(node, json, versionCounter, logger, true);
}

public static void writeJSONFileWithoutVersionIncrement(String node, String json, AtomicLong versionCounter, Logger logger)
throws Exception {
writeJSONFile(node, json, versionCounter, logger, false);
}

private Tuple<CountDownLatch, AtomicLong> setupCleanupClusterStateListener(String node) {
ClusterService clusterService = internalCluster().clusterService(node);
CountDownLatch savedClusterState = new CountDownLatch(1);
Expand Down Expand Up @@ -171,7 +201,10 @@ public void clusterChanged(ClusterChangedEvent event) {
private void assertClusterStateSaveOK(CountDownLatch savedClusterState, AtomicLong metadataVersion, String expectedBytesPerSec)
throws Exception {
assertTrue(savedClusterState.await(20, TimeUnit.SECONDS));
assertExpectedRecoveryBytesSettingAndVersion(metadataVersion, expectedBytesPerSec);
}

private static void assertExpectedRecoveryBytesSettingAndVersion(AtomicLong metadataVersion, String expectedBytesPerSec) {
final ClusterStateResponse clusterStateResponse = clusterAdmin().state(
new ClusterStateRequest(TEST_REQUEST_TIMEOUT).waitForMetadataVersion(metadataVersion.get())
).actionGet();
Expand Down Expand Up @@ -337,6 +370,77 @@ public void testErrorSaved() throws Exception {
assertClusterStateNotSaved(savedClusterState.v1(), savedClusterState.v2());
}

public void testErrorCanRecoverOnRestart() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode(
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
);
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
var savedClusterState = setupClusterStateListenerForError(masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

writeJSONFile(masterNode, testErrorJSON, versionCounter, logger);
AtomicLong metadataVersion = savedClusterState.v2();
assertClusterStateNotSaved(savedClusterState.v1(), metadataVersion);
assertHasErrors(metadataVersion, "not_cluster_settings");

// write valid json without version increment to simulate ES being able to process settings after a restart (usually, this would be
// due to a code change)
writeJSONFileWithoutVersionIncrement(masterNode, testJSON, versionCounter, logger);
internalCluster().restartNode(masterNode);
ensureGreen();

// we don't know the exact metadata version to wait for so rely on an assertBusy instead
assertBusy(() -> assertExpectedRecoveryBytesSettingAndVersion(metadataVersion, "50mb"));
assertBusy(() -> assertNoErrors(metadataVersion));
}

public void testNewErrorOnRestartReprocessing() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode(
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
);
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
var savedClusterState = setupClusterStateListenerForError(masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

writeJSONFile(masterNode, testErrorJSON, versionCounter, logger);
AtomicLong metadataVersion = savedClusterState.v2();
assertClusterStateNotSaved(savedClusterState.v1(), metadataVersion);
assertHasErrors(metadataVersion, "not_cluster_settings");

// write json with new error without version increment to simulate ES failing to process settings after a restart for a new reason
// (usually, this would be due to a code change)
writeJSONFileWithoutVersionIncrement(masterNode, testOtherErrorJSON, versionCounter, logger);
assertHasErrors(metadataVersion, "not_cluster_settings");
internalCluster().restartNode(masterNode);
ensureGreen();

assertBusy(() -> assertHasErrors(metadataVersion, "bad_cluster_settings"));
}

public void testSettingsAppliedOnMasterReElection() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start master node");
Expand Down Expand Up @@ -383,4 +487,21 @@ public void testSettingsAppliedOnMasterReElection() throws Exception {
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
}

private void assertHasErrors(AtomicLong waitForMetadataVersion, String expectedError) {
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
assertThat(errorMetadata, is(notNullValue()));
assertThat(errorMetadata.errors(), containsInAnyOrder(containsString(expectedError)));
}

private void assertNoErrors(AtomicLong waitForMetadataVersion) {
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
assertThat(errorMetadata, is(nullValue()));
}

private ReservedStateErrorMetadata getErrorMetadata(AtomicLong waitForMetadataVersion) {
final ClusterStateResponse clusterStateResponse = clusterAdmin().state(
new ClusterStateRequest(TEST_REQUEST_TIMEOUT).waitForMetadataVersion(waitForMetadataVersion.get())
).actionGet();
return clusterStateResponse.getState().getMetadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE).errorMetadata();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ public AbstractFileWatchingService(Path watchedFile) {

protected abstract void processInitialFileMissing() throws InterruptedException, ExecutionException, IOException;

/**
* Defaults to generic {@link #processFileChanges()} behavior.
* An implementation can override this to define different file handling when the file is processed during
* initial service start.
*/
protected void processFileOnServiceStart() throws IOException, ExecutionException, InterruptedException {
processFileChanges();
}

public final void addFileChangedListener(FileChangedListener listener) {
eventListeners.add(listener);
}
Expand Down Expand Up @@ -174,7 +183,7 @@ protected final void watcherThread() {

if (Files.exists(path)) {
logger.debug("found initial operator settings file [{}], applying...", path);
processSettingsAndNotifyListeners();
processSettingsOnServiceStartAndNotifyListeners();
} else {
processInitialFileMissing();
// Notify everyone we don't have any initial file settings
Expand Down Expand Up @@ -290,6 +299,17 @@ final WatchKey enableDirectoryWatcher(WatchKey previousKey, Path settingsDir) th
} while (true);
}

void processSettingsOnServiceStartAndNotifyListeners() throws InterruptedException {
try {
processFileOnServiceStart();
for (var listener : eventListeners) {
listener.watchedFileChanged();
}
} catch (IOException | ExecutionException e) {
logger.error(() -> "Error processing watched file: " + watchedFile(), e);
}
}

void processSettingsAndNotifyListeners() throws InterruptedException {
try {
processFileChanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,22 @@

import static org.elasticsearch.ExceptionsHelper.stackTrace;

record ErrorState(String namespace, Long version, List<String> errors, ReservedStateErrorMetadata.ErrorKind errorKind) {
ErrorState(String namespace, Long version, Exception e, ReservedStateErrorMetadata.ErrorKind errorKind) {
this(namespace, version, List.of(stackTrace(e)), errorKind);
record ErrorState(
String namespace,
Long version,
ReservedStateVersionCheck versionCheck,
List<String> errors,
ReservedStateErrorMetadata.ErrorKind errorKind
) {

ErrorState(
String namespace,
Long version,
ReservedStateVersionCheck versionCheck,
Exception e,
ReservedStateErrorMetadata.ErrorKind errorKind
) {
this(namespace, version, versionCheck, List.of(stackTrace(e)), errorKind);
}

public String toString() {
Expand Down
Loading