Skip to content

Commit

Permalink
remove synced flush from engines
Browse files Browse the repository at this point in the history
Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Jan 18, 2022
1 parent 6e7b868 commit c0b6125
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public TransportShardFlushAction(
ShardFlushRequest::new,
ThreadPool.Names.FLUSH
);
transportService.registerRequestHandler(
PRE_SYNCED_FLUSH_ACTION_NAME,
ThreadPool.Names.FLUSH,
PreShardSyncedFlushRequest::new,
new PreSyncedFlushTransportHandler(indicesService)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,6 @@ public String getId() {
return id;
}

/**
* A raw version of the commit id (see {@link SegmentInfos#getId()}
*/
public Engine.CommitId getRawCommitId() {
return new Engine.CommitId(Base64.getDecoder().decode(id));
}

/**
* Returns the number of documents in the in this commit
*/
Expand Down
83 changes: 4 additions & 79 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
Expand Down Expand Up @@ -96,7 +93,6 @@
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -121,7 +117,7 @@

public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id";
public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: remove sync_id in 3.0
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
Expand Down Expand Up @@ -577,22 +573,6 @@ public static class NoOpResult extends Result {

}

/**
* Attempts to do a special commit where the given syncID is put into the commit data. The attempt
* succeeds if there are not pending writes in lucene and the current point is equal to the expected one.
*
* @param syncId id of this sync
* @param expectedCommitId the expected value of
* @return true if the sync commit was made, false o.w.
*/
public abstract SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException;

public enum SyncedFlushResult {
SUCCESS,
COMMIT_MISMATCH,
PENDING_OPERATIONS
}

protected final GetResult getFromSearcher(
Get get,
BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory,
Expand Down Expand Up @@ -1139,20 +1119,17 @@ public boolean refreshNeeded() {
* @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
* @return the commit Id for the resulting commit
*/
public abstract CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException;
public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException;

/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
* documents in the lucene index to disk including a potentially heavy and durable fsync operation.
* This operation is not going to block if another flush operation is currently running and won't write
* a lucene commit if nothing needs to be committed.
*
* @return the commit Id for the resulting commit
*/
public final CommitId flush() throws EngineException {
return flush(false, false);
public final void flush() throws EngineException {
flush(false, false);
}

/**
Expand Down Expand Up @@ -1923,58 +1900,6 @@ private void awaitPendingClose() {
}
}

public static class CommitId implements Writeable {

private final byte[] id;

public CommitId(byte[] id) {
assert id != null;
this.id = Arrays.copyOf(id, id.length);
}

/**
* Read from a stream.
*/
public CommitId(StreamInput in) throws IOException {
assert in != null;
this.id = in.readByteArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByteArray(id);
}

@Override
public String toString() {
return Base64.getEncoder().encodeToString(id);
}

public boolean idsEqual(byte[] id) {
return Arrays.equals(id, this.id);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CommitId commitId = (CommitId) o;

return Arrays.equals(id, commitId.id);

}

@Override
public int hashCode() {
return Arrays.hashCode(id);
}
}

public static class IndexCommitRef implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
Expand Down
104 changes: 11 additions & 93 deletions server/src/main/java/org/opensearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1938,71 +1938,6 @@ public void writeIndexingBuffer() throws EngineException {
refresh("write indexing buffer", SearcherScope.INTERNAL, false);
}

@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
// best effort attempt before we acquire locks
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
}
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
// lets do a refresh to make sure we shrink the version map. This refresh will be either a no-op (just shrink the version map)
// or we also have uncommitted changes and that causes this syncFlush to fail.
refresh("sync_flush", SearcherScope.INTERNAL, true);
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
}
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
logger.trace("starting sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
return SyncedFlushResult.SUCCESS;
} catch (IOException ex) {
maybeFailEngine("sync commit", ex);
throw new EngineException(shardId, "failed to sync commit", ex);
}
}

final boolean tryRenewSyncCommit() {
boolean renewed = false;
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
if (syncId != null
&& indexWriter.hasUncommittedChanges()
&& translog.estimateTotalOperationsFromMinSeq(localCheckpointOfLastCommit + 1) == 0) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
renewed = true;
}
} catch (IOException ex) {
maybeFailEngine("renew sync commit", ex);
throw new EngineException(shardId, "failed to renew sync commit", ex);
}
if (renewed) {
// refresh outside of the write lock
// we have to refresh internal reader here to ensure we release unreferenced segments.
refresh("renew sync commit", SearcherScope.INTERNAL, true);
}
return renewed;
}

@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
Expand Down Expand Up @@ -2042,26 +1977,24 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
throw new IllegalArgumentException(
"wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing
);
}
final byte[] newCommitId;
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing) {
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
return new CommitId(lastCommittedSegmentInfos.getId());
if (waitIfOngoing == false) {
return;
}
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
logger.trace("acquired flush lock immediately");
}
Expand All @@ -2081,7 +2014,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
try {
translog.rollGeneration();
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog, null);
commitIndexWriter(indexWriter, translog);
logger.trace("finished commit for flush");

// a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved
Expand All @@ -2104,7 +2037,6 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
refreshLastCommittedSegmentInfos();

}
newCommitId = lastCommittedSegmentInfos.getId();
} catch (FlushFailedEngineException ex) {
maybeFailEngine("flush", ex);
throw ex;
Expand All @@ -2117,7 +2049,6 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
return new CommitId(newCommitId);
}

private void refreshLastCommittedSegmentInfos() {
Expand Down Expand Up @@ -2289,9 +2220,7 @@ public void forceMerge(
this.forceMergeUUID = forceMergeUUID;
}
if (flush) {
if (tryRenewSyncCommit() == false) {
flush(false, true);
}
flush(false, true);
}
if (upgrade) {
logger.info("finished segment upgrade");
Expand Down Expand Up @@ -2682,15 +2611,9 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
// if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
// if we either have records in the translog or if we don't have a sync ID at all...
// maybe even more important, we flush after all merges finish and we are inactive indexing-wise to
// if we have no pending merges and we are supposed to flush once merges have finished to
// free up transient disk usage of the (presumably biggish) segments that were just merged
if (tryRenewSyncCommit() == false) {
flush();
}
flush();
}
});
} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
Expand Down Expand Up @@ -2727,10 +2650,8 @@ protected void doRun() throws Exception {
*
* @param writer the index writer to commit
* @param translog the translog
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
Expand All @@ -2747,9 +2668,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,15 +435,7 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) {
// we can't do synced flushes this would require an indexWriter which we don't have
throw new UnsupportedOperationException("syncedFlush is not supported on a read-only engine");
}

@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
return new CommitId(lastCommittedSegmentInfos.getId());
}
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {}

@Override
public void forceMerge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1321,9 +1321,8 @@ public CompletionStats completionStats(String... fields) {
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return the commit ID
*/
public Engine.CommitId flush(FlushRequest request) {
public void flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
Expand All @@ -1334,9 +1333,8 @@ public Engine.CommitId flush(FlushRequest request) {
*/
verifyNotClosed();
final long time = System.nanoTime();
final Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
getEngine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

public class RestSyncedFlushAction extends BaseRestHandler {

private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestSyncedFlushAction.class.getClass());
private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestSyncedFlushAction.class);

@Override
public List<Route> routes() {
Expand Down
Loading

0 comments on commit c0b6125

Please sign in to comment.