Skip to content

Commit

Permalink
Skip shard refreshes if shard is search idle (#27500)
Browse files Browse the repository at this point in the history
Today we refresh automatically in the background by default very second.
This default behavior has a significant impact on indexing performance
if the refreshes are not needed.
This change introduces a notion of a shard being `search idle` which a
shard transitions to after (default) `30s` without any access to an
external searcher. Once a shard is search idle all scheduled refreshes
will be skipped unless there are any refresh listeners registered.
If a search happens on a `serach idle` shard the search request _park_
on a refresh listener and will be executed once the next scheduled refresh
occurs. This will also turn the shard into the `non-idle` state immediately.

This behavior is only applied if there is no explicit refresh interval set.
  • Loading branch information
s1monw committed Nov 27, 2017
1 parent 8d6bfe5 commit f23ed61
Show file tree
Hide file tree
Showing 13 changed files with 507 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -86,6 +88,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void asyncShardOperation(ExplainRequest request, ShardId shardId, ActionListener<ExplainResponse> listener) throws IOException {
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}

@Override
protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException {
ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -38,6 +38,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the get operation.
*/
Expand Down Expand Up @@ -76,6 +78,23 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
listener.onResponse(shardOperation(request, shardId));
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}

@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -47,6 +48,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
Expand Down Expand Up @@ -78,7 +81,7 @@ protected TransportSingleShardAction(Settings settings, String actionName, Threa
if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
}

/**
Expand All @@ -97,6 +100,19 @@ protected void doExecute(Request request, ActionListener<Response> listener) {

protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;

protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(this.executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, shardId));
}
});
}
protected abstract Response newResponse();

protected abstract boolean resolveIndex(Request request);
Expand Down Expand Up @@ -291,11 +307,27 @@ public void messageReceived(final Request request, final TransportChannel channe
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
asyncShardOperation(request, request.internalShardId, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (IOException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
}
}

/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the get operation.
*/
Expand Down Expand Up @@ -82,6 +85,23 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId, ActionListener<TermVectorsResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
listener.onResponse(shardOperation(request, shardId));
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}

@Override
protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
40 changes: 29 additions & 11 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -624,6 +625,27 @@ public synchronized void updateMetaData(final IndexMetaData metadata) {
}
}
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
// docs to become visible immediately. This also flushes all pending indexing / search reqeusts
// that are waiting for a refresh.
threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("forced refresh failed after interval change", e);
}

@Override
protected void doRun() throws Exception {
maybeRefreshEngine(true);
}

@Override
public boolean isForceExecution() {
return true;
}
});
rescheduleRefreshTasks();
}
final Translog.Durability durability = indexSettings.getTranslogDurability();
Expand Down Expand Up @@ -686,17 +708,13 @@ private void maybeFSyncTranslogs() {
}
}

private void maybeRefreshEngine() {
if (indexSettings.getRefreshInterval().millis() > 0) {
private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
if (shard.isReadAllowed()) {
try {
if (shard.isRefreshNeeded()) {
shard.refresh("schedule");
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
try {
shard.scheduledRefresh();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
}
}
Expand Down Expand Up @@ -896,7 +914,7 @@ final class AsyncRefreshTask extends BaseAsyncTask {

@Override
protected void runInternal() {
indexService.maybeRefreshEngine();
indexService.maybeRefreshEngine(false);
}

@Override
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public final class IndexSettings {
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING =
Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100),
Property.IndexScope);
public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER =
Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30),
TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic);
public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING =
new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(),
(value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope);
Expand Down Expand Up @@ -262,6 +265,8 @@ public final class IndexSettings {
private volatile int maxNgramDiff;
private volatile int maxShingleDiff;
private volatile boolean TTLPurgeDisabled;
private volatile TimeValue searchIdleAfter;

/**
* The maximum number of refresh listeners allows on this shard.
*/
Expand Down Expand Up @@ -371,6 +376,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered
if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) {
throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: ["
Expand Down Expand Up @@ -411,8 +417,11 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }

private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}
Expand Down Expand Up @@ -752,4 +761,16 @@ public IndexSortConfig getIndexSortConfig() {
}

public IndexScopedSettings getScopedSettings() { return scopedSettings;}

/**
* Returns true iff the refresh setting exists or in other words is explicitly set.
*/
public boolean isExplicitRefresh() {
return INDEX_REFRESH_INTERVAL_SETTING.exists(settings);
}

/**
* Returns the time that an index shard becomes search idle unless it's accessed in between
*/
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
}
Loading

0 comments on commit f23ed61

Please sign in to comment.