Skip to content

Commit

Permalink
Account soft-deletes in FrozenEngine (#51192) (#51229)
Browse files Browse the repository at this point in the history
Currently, we do not exclude soft-deleted documents when opening index
reader in the FrozenEngine.

Backport of #51192
  • Loading branch information
dnhatn committed Jan 21, 2020
1 parent 6c04815 commit 857d169
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -540,28 +540,6 @@ private static Version indexVersionCreated(final String indexName) throws IOExce
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
private static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
this.stats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings())) {
for (LeafReaderContext ctx : reader.getContext().leaves()) {
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
fillSegmentStats(segmentReader, true, stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class ReadOnlyEngine extends Engine {
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
* ID field if we are reading form memory maps.
*/
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
private static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
BlockTreeTermsReader.FSTLoadMode.AUTO.name());
private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
Expand Down Expand Up @@ -534,4 +535,13 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() :
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
}

protected DirectoryReader openDirectory(Directory dir, IndexSettings indexSettings) throws IOException {
final DirectoryReader reader = DirectoryReader.open(dir, OFF_HEAP_READER_ATTRIBUTES);
if (indexSettings.isSoftDeleteEnabled()) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
} else {
return reader;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -1144,4 +1145,26 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) thro
}
}, 60, TimeUnit.SECONDS);
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
protected static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FrozenEngine(EngineConfig config) {

boolean success = false;
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings())) {
canMatchReader = ElasticsearchDirectoryReader.wrap(new RewriteCachingDirectoryReader(directory, reader.leaves()),
config.getShardId());
// we record the segment stats here - that's what the reader needs when it's open and it give the user
Expand Down Expand Up @@ -167,7 +167,7 @@ private synchronized ElasticsearchDirectoryReader getOrOpenReader() throws IOExc
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
listeners.beforeRefresh();
}
final DirectoryReader dirReader = DirectoryReader.open(engineConfig.getStore().directory(), OFF_HEAP_READER_ATTRIBUTES);
final DirectoryReader dirReader = openDirectory(engineConfig.getStore().directory(), engineConfig.getIndexSettings());
reader = lastOpenedReader = wrapReader(dirReader, Function.identity());
processReader(reader);
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.equalTo;

public class FrozenEngineTests extends EngineTestCase {

public void testAcquireReleaseReset() throws IOException {
Expand Down Expand Up @@ -328,4 +330,30 @@ public void testCanMatch() throws IOException {
}
}
}

public void testSearchers() throws Exception {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null,
globalCheckpoint::get, new NoneCircuitBreakerService());
final int totalDocs;
try (InternalEngine engine = createEngine(config)) {
applyOperations(engine, generateHistoryOnReplica(between(10, 1000), false, randomBoolean(), randomBoolean()));
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
engine.syncTranslog();
engine.flush();
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length;
}
}
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
assertThat(topDocs.scoreDocs.length, equalTo(totalDocs));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestGetAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
Expand Down Expand Up @@ -304,7 +305,7 @@ public void testRollupAfterRestart() throws Exception {
assertRollUpJob("rollup-job-test");
}
}

public void testSlmStats() throws IOException {
SnapshotLifecyclePolicy slmPolicy = new SnapshotLifecyclePolicy("test-policy", "test-policy", "* * * 31 FEB ? *", "test-repo",
Collections.singletonMap("indices", Collections.singletonList("*")), null);
Expand Down Expand Up @@ -823,4 +824,40 @@ private Map<String, Object> getJob(Map<String, Object> jobsMap, String targetJob
}
return null;
}

public void testFrozenIndexAfterRestarted() throws Exception {
final String index = "test_frozen_index";
if (isRunningAgainstOldCluster()) {
Settings.Builder settings = Settings.builder();
if (minimumNodeVersion().onOrAfter(Version.V_6_5_0) && randomBoolean()) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
}
createIndex(index, settings.build());
ensureGreen(index);
int numDocs = randomIntBetween(10, 500);
for (int i = 0; i < numDocs; i++) {
int id = randomIntBetween(0, 100);
final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
assertOK(client().performRequest(indexRequest));
if (rarely()) {
flush(index, randomBoolean());
}
}
} else {
ensureGreen(index);
final int totalHits = (int) XContentMapValues.extractValue("hits.total.value",
entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
assertOK(client().performRequest(new Request("POST", index + "/_freeze")));
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter("ignore_throttled", "false");
assertThat(XContentMapValues.extractValue("hits.total.value", entityAsMap(client().performRequest(request))),
equalTo(totalHits));
assertOK(client().performRequest(new Request("POST", index + "/_unfreeze")));
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
}
}
}

0 comments on commit 857d169

Please sign in to comment.