Skip to content

Commit

Permalink
Warn on slow metadata persistence (#47130)
Browse files Browse the repository at this point in the history
Today if metadata persistence is excessively slow on a master-ineligible node
then the `ClusterApplierService` emits a warning indicating that the
`GatewayMetaState` applier was slow, but gives no further details. If it is
excessively slow on a master-eligible node then we do not see any warning at
all, although we might see other consequences such as a lagging node or a
master failure.

With this commit we emit a warning if metadata persistence takes longer than a
configurable threshold, which defaults to `10s`. We also emit statistics that
record how much index metadata was persisted and how much was skipped since
this can help distinguish cases where IO was slow from cases where there are
simply too many indices involved.

Backport of #47005.
  • Loading branch information
DaveCTurner authored Sep 26, 2019
1 parent fcddaa9 commit 45c7783
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.IncrementalClusterStateWriter;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -245,6 +246,7 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
NetworkModule.HTTP_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ public void start(Settings settings, TransportService transportService, ClusterS
throw new ElasticsearchException("failed to load metadata", e);
}
final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()));
= new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService,
manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()),
transportService.getThreadPool()::relativeTimeInMillis);

if (DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings).equals(DiscoveryModule.ZEN_DISCOVERY_TYPE)) {
// only for tests that simulate mixed Zen1/Zen2 clusters, see Zen1IT
if (isMasterOrDataNode(settings)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
*/
package org.elasticsearch.gateway;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;

import java.util.ArrayList;
Expand All @@ -33,11 +39,17 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

/**
* Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata).
*/
class IncrementalClusterStateWriter {
public class IncrementalClusterStateWriter {

private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class);

public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic);

private final MetaStateService metaStateService;

Expand All @@ -46,13 +58,24 @@ class IncrementalClusterStateWriter {
// no need to synchronize access to these fields.
private Manifest previousManifest;
private ClusterState previousClusterState;
private final LongSupplier relativeTimeMillisSupplier;
private boolean incrementalWrite;

IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) {
private volatile TimeValue slowWriteLoggingThreshold;

IncrementalClusterStateWriter(Settings settings, ClusterSettings clusterSettings, MetaStateService metaStateService, Manifest manifest,
ClusterState clusterState, LongSupplier relativeTimeMillisSupplier) {
this.metaStateService = metaStateService;
this.previousManifest = manifest;
this.previousClusterState = clusterState;
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
this.incrementalWrite = false;
this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
}

private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}

void setCurrentTerm(long currentTerm) throws WriteStateException {
Expand Down Expand Up @@ -85,14 +108,26 @@ void setIncrementalWrite(boolean incrementalWrite) {
void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException {
MetaData newMetaData = newState.metaData();

final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();

final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);
long globalStateGeneration = writeGlobalState(writer, newMetaData);
Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState, previousState);
Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);
writeManifest(writer, manifest);

previousManifest = manifest;
previousClusterState = newState;

final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +
"wrote metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());
} else {
logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());
}
}

private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException {
Expand Down Expand Up @@ -256,6 +291,9 @@ static class AtomicClusterStateWriter {
private final MetaStateService metaStateService;
private boolean finished;

private int indicesWritten;
private int indicesSkipped;

AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) {
this.metaStateService = metaStateService;
assert previousManifest != null;
Expand Down Expand Up @@ -320,6 +358,22 @@ void rollback() {
rollbackCleanupActions.forEach(Runnable::run);
finished = true;
}

void incrementIndicesWritten() {
indicesWritten++;
}

void incrementIndicesSkipped() {
indicesSkipped++;
}

int getIndicesWritten() {
return indicesWritten;
}

int getIndicesSkipped() {
return indicesSkipped;
}
}

static class KeepPreviousGeneration implements IndexMetaDataAction {
Expand All @@ -338,6 +392,7 @@ public Index getIndex() {

@Override
public long execute(AtomicClusterStateWriter writer) {
writer.incrementIndicesSkipped();
return generation;
}
}
Expand All @@ -356,6 +411,7 @@ public Index getIndex() {

@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
writer.incrementIndicesWritten();
return writer.writeIndex("freshly created", indexMetaData);
}
}
Expand All @@ -376,6 +432,7 @@ public Index getIndex() {

@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
writer.incrementIndicesWritten();
return writer.writeIndex(
"version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]",
newIndexMetaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,35 @@
*/
package org.elasticsearch.gateway;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
Expand All @@ -48,15 +57,18 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
Expand Down Expand Up @@ -250,20 +262,28 @@ public void testResolveStatesToBeWritten() throws WriteStateException {

assertThat(actions, hasSize(3));

boolean keptPreviousGeneration = false;
boolean wroteNewIndex = false;
boolean wroteChangedIndex = false;

for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) {
if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) {
assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex()));
IncrementalClusterStateWriter.AtomicClusterStateWriter writer
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
assertThat(action.execute(writer), equalTo(3L));
verifyZeroInteractions(writer);
verify(writer, times(1)).incrementIndicesSkipped();
verifyNoMoreInteractions(writer);
keptPreviousGeneration = true;
}
if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) {
assertThat(action.getIndex(), equalTo(newIndex.getIndex()));
IncrementalClusterStateWriter.AtomicClusterStateWriter writer
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L);
assertThat(action.execute(writer), equalTo(0L));
verify(writer, times(1)).incrementIndicesWritten();
wroteNewIndex = true;
}
if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) {
assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex()));
Expand All @@ -273,10 +293,16 @@ public void testResolveStatesToBeWritten() throws WriteStateException {
assertThat(action.execute(writer), equalTo(3L));
ArgumentCaptor<String> reason = ArgumentCaptor.forClass(String.class);
verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex));
verify(writer, times(1)).incrementIndicesWritten();
assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion())));
assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion())));
wroteChangedIndex = true;
}
}

assertTrue(keptPreviousGeneration);
assertTrue(wroteNewIndex);
assertTrue(wroteChangedIndex);
}

private static class MetaStateServiceWithFailures extends MetaStateService {
Expand Down Expand Up @@ -426,4 +452,84 @@ public void testAtomicityWithFailures() throws IOException {
assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData)));
}
}

@TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level")
public void testSlowLogging() throws WriteStateException, IllegalAccessException {
final long slowWriteLoggingThresholdMillis;
final Settings settings;
if (randomBoolean()) {
slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis();
settings = Settings.EMPTY;
} else {
slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000);
settings = Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms")
.build();
}

final DiscoveryNode localNode = newNode("node");
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();

final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10);
final AtomicLong currentTime = new AtomicLong(startTimeMillis);
final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis);

final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(settings, clusterSettings, mock(MetaStateService.class),
new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptyMap()),
clusterState, () -> currentTime.getAndAdd(writeDurationMillis.get()));

assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning at threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));

writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2));
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning above threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));

writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1));
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.UnseenEventExpectation(
"should not see warning below threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"*"));

clusterSettings.applySettings(Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms")
.build());
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning at reduced threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));

assertThat(currentTime.get(), lessThan(startTimeMillis + 10 * slowWriteLoggingThresholdMillis)); // ensure no overflow
}

private void assertExpectedLogs(ClusterState clusterState, IncrementalClusterStateWriter incrementalClusterStateWriter,
MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, WriteStateException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(expectation);
Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class);
Loggers.addAppender(classLogger, mockAppender);

try {
incrementalClusterStateWriter.updateClusterState(clusterState, clusterState);
} finally {
Loggers.removeAppender(classLogger, mockAppender);
mockAppender.stop();
}
mockAppender.assertAllExpectationsMatched();
}
}
Loading

0 comments on commit 45c7783

Please sign in to comment.