Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Lucene operations in engine explicitly #29357

Merged
merged 1 commit into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public class InternalEngine extends Engine {
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
// Lucene operations since this engine was opened - not include operations from existing segments.
private final CounterMetric numDocDeletes = new CounterMetric();
private final CounterMetric numDocAppends = new CounterMetric();
private final CounterMetric numDocUpdates = new CounterMetric();

/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
Expand Down Expand Up @@ -907,11 +912,11 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
try {
if (plan.useLuceneUpdateDocument) {
update(index.uid(), index.docs(), indexWriter);
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
index(index.docs(), indexWriter);
addDocs(index.docs(), indexWriter);
}
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
Expand Down Expand Up @@ -968,12 +973,13 @@ long getMaxSeqNoOfNonAppendOnlyOperations() {
return maxSeqNoOfNonAppendOnlyOperations.get();
}

private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
} else {
indexWriter.addDocument(docs.get(0));
}
numDocAppends.inc(docs.size());
}

private static final class IndexingStrategy {
Expand Down Expand Up @@ -1054,12 +1060,13 @@ private boolean assertDocDoesNotExist(final Index index, final boolean allowDele
return true;
}

private static void update(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.updateDocuments(uid, docs);
} else {
indexWriter.updateDocument(uid, docs.get(0));
}
numDocUpdates.inc(docs.size());
}

@Override
Expand Down Expand Up @@ -1188,6 +1195,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
// any exception that comes from this is a either an ACE or a fatal exception there
// can't be any document failures coming from this
indexWriter.deleteDocuments(delete.uid());
numDocDeletes.inc();
}
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
Expand Down Expand Up @@ -2205,13 +2213,28 @@ boolean isSafeAccessRequired() {
return versionMap.isSafeAccessRequired();
}

/**
* Returns the number of documents have been deleted since this engine was opened.
* This count does not include the deletions from the existing segments before opening engine.
*/
long getNumDocDeletes() {
return numDocDeletes.count();
}

/**
* Returns the number of documents have been appended since this engine was opened.
* This count does not include the appends from the existing segments before opening engine.
*/
long getNumDocAppends() {
return numDocAppends.count();
}

/**
* Returns <code>true</code> iff the index writer has any deletions either buffered in memory or
* in the index.
* Returns the number of documents have been updated since this engine was opened.
* This count does not include the updates from the existing segments before opening engine.
*/
boolean indexWriterHasDeletions() {
return indexWriter.hasDeletions();
long getNumDocUpdates() {
return numDocUpdates.count();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2939,21 +2939,21 @@ public void testDoubleDeliveryPrimary() throws IOException {
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
Engine.IndexResult retryResult = engine.index(retry);
assertTrue(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 1, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertTrue(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 0, 1, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertTrue(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 0, 2, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
Expand Down Expand Up @@ -3000,23 +3000,23 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
engine.delete(delete);
assertEquals(1, engine.getNumVersionLookups());
assertTrue(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 1);
Engine.IndexResult retryResult = engine.index(retry);
assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
engine.delete(delete);
assertTrue(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 1);
assertEquals(2, engine.getNumVersionLookups());
Engine.IndexResult indexResult = engine.index(operation);
assertEquals(belowLckp ? 2 : 3, engine.getNumVersionLookups());
Expand All @@ -3041,21 +3041,29 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
Engine.IndexResult retryResult = engine.index(retry);
assertEquals(retry.seqNo() > operation.seqNo(), engine.indexWriterHasDeletions());
if (retry.seqNo() > operation.seqNo()) {
assertLuceneOperations(engine, 1, 1, 0);
} else {
assertLuceneOperations(engine, 1, 0, 0);
}
assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertEquals(operation.seqNo() > retry.seqNo(), engine.indexWriterHasDeletions());
if (operation.seqNo() > retry.seqNo()) {
assertLuceneOperations(engine, 1, 1, 0);
} else {
assertLuceneOperations(engine, 1, 0, 0);
}
assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
Expand Down Expand Up @@ -3096,27 +3104,27 @@ public void testDoubleDeliveryReplica() throws IOException {
Engine.Index duplicate = replicaIndexForDoc(doc, 1, 20, true);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(1, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
if (randomBoolean()) {
engine.refresh("test");
}
Engine.IndexResult retryResult = engine.index(duplicate);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
} else {
Engine.IndexResult retryResult = engine.index(duplicate);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
if (randomBoolean()) {
engine.refresh("test");
}
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
Expand Down Expand Up @@ -3278,10 +3286,11 @@ public void testRetryConcurrently() throws InterruptedException, IOException {
}
if (primary) {
// primaries rely on lucene dedup and may index the same document twice
assertTrue(engine.indexWriterHasDeletions());
assertThat(engine.getNumDocUpdates(), greaterThanOrEqualTo((long) numDocs));
assertThat(engine.getNumDocAppends() + engine.getNumDocUpdates(), equalTo(numDocs * 2L));
} else {
// replicas rely on seq# based dedup and in this setup (same seq#) should never rely on lucene
assertFalse(engine.indexWriterHasDeletions());
assertLuceneOperations(engine, numDocs, 0, 0);
}
}

Expand Down Expand Up @@ -3377,8 +3386,7 @@ public void run() {
}
assertEquals(0, engine.getNumVersionLookups());
assertEquals(0, engine.getNumIndexVersionsLookups());
assertFalse(engine.indexWriterHasDeletions());

assertLuceneOperations(engine, numDocs, 0, 0);
}

public static long getNumVersionLookups(InternalEngine engine) { // for other tests to access this
Expand Down Expand Up @@ -4659,4 +4667,13 @@ private static void trimUnsafeCommits(EngineConfig config) throws IOException {
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
}

void assertLuceneOperations(InternalEngine engine, long expectedAppends, long expectedUpdates, long expectedDeletes) {
String message = "Lucene operations mismatched;" +
" appends [actual:" + engine.getNumDocAppends() + ", expected:" + expectedAppends + "]," +
" updates [actual:" + engine.getNumDocUpdates() + ", expected:" + expectedUpdates + "]," +
" deletes [actual:" + engine.getNumDocDeletes() + ", expected:" + expectedDeletes + "]";
assertThat(message, engine.getNumDocAppends(), equalTo(expectedAppends));
assertThat(message, engine.getNumDocUpdates(), equalTo(expectedUpdates));
assertThat(message, engine.getNumDocDeletes(), equalTo(expectedDeletes));
}
}