-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow _update and upsert to read from the transaction log #29264
Changes from all commits
a8d3b73
3fa0fc0
41a3c32
3b84354
5be76c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,7 @@ | |
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
|
@@ -145,6 +146,7 @@ public class InternalEngine extends Engine { | |
* being indexed/deleted. | ||
*/ | ||
private final AtomicLong writingBytes = new AtomicLong(); | ||
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); | ||
|
||
@Nullable | ||
private final String historyUUID; | ||
|
@@ -558,6 +560,27 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search | |
throw new VersionConflictEngineException(shardId, get.type(), get.id(), | ||
get.versionType().explainConflictForReads(versionValue.version, get.version())); | ||
} | ||
if (get.isReadFromTranslog()) { | ||
// this is only used for updates - API _GET calls will always read form a reader for consistency | ||
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 | ||
if (versionValue.getLocation() != null) { | ||
try { | ||
Translog.Operation operation = translog.readOperation(versionValue.getLocation()); | ||
if (operation != null) { | ||
// in the case of a already pruned translog generation we might get null here - yet very unlikely | ||
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig | ||
.getIndexSettings().getIndexVersionCreated()); | ||
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)), | ||
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0)); | ||
} | ||
} catch (IOException e) { | ||
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In other places, we have wrapped the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's bogus I think. There is no exception throw from maybeFaileEngine only errors which should not be handled. I think we are good here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will clean this up in a followup There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, makes sense |
||
throw new EngineException(shardId, "failed to read operation from translog", e); | ||
} | ||
} else { | ||
trackTranslogLocation.set(true); | ||
} | ||
} | ||
refresh("realtime_get", SearcherScope.INTERNAL); | ||
} | ||
scope = SearcherScope.INTERNAL; | ||
|
@@ -790,6 +813,10 @@ public IndexResult index(Index index) throws IOException { | |
} | ||
indexResult.setTranslogLocation(location); | ||
} | ||
if (plan.indexIntoLucene && indexResult.hasFailure() == false) { | ||
versionMap.maybePutUnderLock(index.uid().bytes(), | ||
getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation())); | ||
} | ||
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); | ||
} | ||
|
@@ -916,8 +943,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) | |
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); | ||
index(index.docs(), indexWriter); | ||
} | ||
versionMap.maybePutUnderLock(index.uid().bytes(), | ||
new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); | ||
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); | ||
} catch (Exception ex) { | ||
if (indexWriter.getTragicException() == null) { | ||
|
@@ -941,6 +966,13 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) | |
} | ||
} | ||
|
||
private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) { | ||
if (location != null && trackTranslogLocation.get()) { | ||
return new TranslogVersionValue(location, version, seqNo, term); | ||
} | ||
return new VersionValue(version, seqNo, term); | ||
} | ||
|
||
/** | ||
* returns true if the indexing operation may have already be processed by this engine. | ||
* Note that it is OK to rarely return true even if this is not the case. However a `false` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when do we expect this to be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the translog generation is not available anymore I think it’s unlikely but I see a chance. I can add a comment in the code