Skip to content

Commit

Permalink
Addressed feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc D'Mello committed Oct 31, 2024
1 parent d8ace65 commit 707d214
Show file tree
Hide file tree
Showing 36 changed files with 322 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexUpgrader;
Expand Down Expand Up @@ -131,7 +132,7 @@ public void testUpgradeOldSingleSegmentIndexWithAdditions() throws Exception {
// add dummy segments (which are all in current
// version) to single segment index
MergePolicy mp = random().nextBoolean() ? newLogMergePolicy() : newTieredMergePolicy();
IndexWriterConfig iwc = new IndexWriterConfig(null).setMergePolicy(mp);
IndexWriterConfig iwc = new IndexWriterConfig((Analyzer) null).setMergePolicy(mp);
IndexWriter w = new IndexWriter(directory, iwc);
w.addIndexes(ramDir);
try (w) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.nio.file.Path;
import java.util.Properties;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.benchmark.BenchmarkTestCase;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
Expand Down Expand Up @@ -45,7 +46,7 @@ public static void beforeClassAddIndexesTaskTest() throws Exception {
inputDir = testDir.resolve("input");
Directory tmpDir = newFSDirectory(inputDir);
try {
IndexWriter writer = new IndexWriter(tmpDir, new IndexWriterConfig(null));
IndexWriter writer = new IndexWriter(tmpDir, new IndexWriterConfig((Analyzer) null));
for (int i = 0; i < 10; i++) {
writer.addDocument(new Document());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.lucene.index;

import java.io.IOException;

/**
* Default {@link FlushPolicy} implementation that flushes new segments based on RAM used and
* document count depending on the IndexWriter's {@link IndexWriterConfig}. It also applies pending
Expand Down Expand Up @@ -52,6 +54,17 @@ && flushOnDocCount()
}
}

@Override
public void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException {
long totalBytes = perWriterRamManager.getTotalBufferBytesUsed();
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
ramManager.flushRoundRobin();
}
}

private void flushDeletes(DocumentsWriterFlushControl control) {
control.setApplyAllDeletes();
if (infoStream.isEnabled("FP")) {
Expand Down
12 changes: 12 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;

import java.io.IOException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;

Expand Down Expand Up @@ -57,6 +58,17 @@ abstract class FlushPolicy {
public abstract void onChange(
DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread);

/**
* Chooses which writer should be flushed. Default implementation chooses the writer with most RAM
* usage
*
* @param ramManager the {@link IndexWriterRAMManager} being used to actually flush the writers
*/
public abstract void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException;

/** Called by DocumentsWriter to initialize the FlushPolicy */
protected synchronized void init(LiveIndexWriterConfig indexWriterConfig) {
this.indexWriterConfig = indexWriterConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.CommandLineUtil;
Expand Down Expand Up @@ -135,7 +136,7 @@ static IndexUpgrader parseArgs(String[] args) throws IOException {
* {@code matchVersion}. The tool refuses to upgrade indexes with multiple commit points.
*/
public IndexUpgrader(Directory dir) {
this(dir, new IndexWriterConfig(null), false);
this(dir, new IndexWriterConfig((Analyzer) null), false);
}

/**
Expand All @@ -145,7 +146,7 @@ public IndexUpgrader(Directory dir) {
* be sent to this stream.
*/
public IndexUpgrader(Directory dir, InfoStream infoStream, boolean deletePriorCommits) {
this(dir, new IndexWriterConfig(null), deletePriorCommits);
this(dir, new IndexWriterConfig((Analyzer) null), deletePriorCommits);
if (null != infoStream) {
this.iwc.setInfoStream(infoStream);
}
Expand Down
38 changes: 6 additions & 32 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -941,14 +941,11 @@ protected final void ensureOpen() throws AlreadyClosedException {
* @param d the index directory. The index is either created or appended according <code>
* conf.getOpenMode()</code>.
* @param conf the configuration settings according to which IndexWriter should be initialized.
* @param indexWriterRAMManager The RAM manager used for multi-tenant RAM management
* @throws IOException if the directory cannot be read/written to, or if it does not exist and
* <code>conf.getOpenMode()</code> is <code>OpenMode.APPEND</code> or if there is any other
* low-level IO error
*/
public IndexWriter(
Directory d, IndexWriterConfig conf, IndexWriterRAMManager indexWriterRAMManager)
throws IOException {
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
enableTestPoints = isEnableTestPoints();
conf.setIndexWriter(this); // prevent reuse by other instances
config = conf;
Expand Down Expand Up @@ -1216,27 +1213,9 @@ public IndexWriter(
writeLock = null;
}
}

if (indexWriterRAMManager != null) {
this.indexWriterRAMManager =
new IndexWriterRAMManager.PerWriterIndexWriterRAMManager(this, indexWriterRAMManager);
} else {
this.indexWriterRAMManager = null;
}
}

/**
* Constructor for IndexWriter's that don't require multi-tenant RAM management
*
* @param d the index directory. The index is either created or appended according <code>
* conf.getOpenMode()</code>.
* @param conf the configuration settings according to which IndexWriter should be initialized.
* @throws IOException if the directory cannot be read/written to, or if it does not exist and
* <code>conf.getOpenMode()</code> is <code>OpenMode.APPEND</code> or if there is any other
* low-level IO error
*/
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
this(d, conf, null);
this.indexWriterRAMManager =
new IndexWriterRAMManager.PerWriterIndexWriterRAMManager(
this, config.getIndexWriterRAMManager());
}

/** Confirms that the incoming index sort (if any) matches the existing index sort (if any). */
Expand Down Expand Up @@ -1366,10 +1345,6 @@ private void shutdown() throws IOException {
}
rollbackInternal(); // if we got that far lets rollback and close
}

if (indexWriterRAMManager != null) {
indexWriterRAMManager.removeWriter();
}
}

/**
Expand All @@ -1395,6 +1370,7 @@ private void shutdown() throws IOException {
*/
@Override
public void close() throws IOException {
indexWriterRAMManager.removeWriter();
if (config.getCommitOnClose()) {
shutdown();
} else {
Expand Down Expand Up @@ -6042,9 +6018,7 @@ private long maybeProcessEvents(long seqNo) throws IOException {
seqNo = -seqNo;
processEvents(true);
}
if (indexWriterRAMManager != null) {
indexWriterRAMManager.flushIfNecessary();
}
indexWriterRAMManager.flushIfNecessary(config.flushPolicy);
return seqNo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,21 @@ public IndexWriterConfig() {
* problem you should switch to {@link LogByteSizeMergePolicy} or {@link LogDocMergePolicy}.
*/
public IndexWriterConfig(Analyzer analyzer) {
super(analyzer);
this(analyzer, new IndexWriterRAMManager(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB));
}

/**
* Creates a new config with the provided {@link IndexWriterRAMManager}. If you want to share a
* buffer between multiple {@link IndexWriter}, you will need to use this constructor as {@link
* IndexWriterConfig} maintains a 1:1 relationship with {@link IndexWriter}
*/
public IndexWriterConfig(IndexWriterRAMManager indexWriterRAMManager) {
this(new StandardAnalyzer(), indexWriterRAMManager);
}

/** Creates a new config with the provided {@link Analyzer} and {@link IndexWriterRAMManager} */
public IndexWriterConfig(Analyzer analyzer, IndexWriterRAMManager indexWriterRAMManager) {
super(analyzer, indexWriterRAMManager);
}

/**
Expand Down Expand Up @@ -393,6 +407,11 @@ public double getRAMBufferSizeMB() {
return super.getRAMBufferSizeMB();
}

@Override
public IndexWriterRAMManager getIndexWriterRAMManager() {
return super.getIndexWriterRAMManager();
}

/**
* Information about merges, deletes and a message when maxFieldLength is reached will be printed
* to this. Must not be null, but {@link InfoStream#NO_OUTPUT} may be used to suppress output.
Expand Down
Loading

0 comments on commit 707d214

Please sign in to comment.