-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate Engine with decoupled Translog interfaces #3671
Conversation
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
translogManagerRef = new InternalTranslogManager( | ||
engineConfig.getTranslogConfig(), | ||
engineConfig.getPrimaryTermSupplier(), | ||
engineConfig.getGlobalCheckpointSupplier(), | ||
translogDeletionPolicy, | ||
shardId, | ||
readLock, | ||
() -> getLocalCheckpointTracker(), | ||
translogUUID, | ||
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener)), | ||
this::ensureOpen | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Contemplating introducing a TranslogFactory
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
❌ Gradle Check failure a3d45e8aa43e96c8fba5329be196d2562d4dbda5 |
} catch (IOException e) { | ||
IOUtils.closeWhileHandlingException(store::decRef, readerManager); | ||
Translog translog = null; | ||
if (translogManagerRef != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if TranslogManager
should implement Closeable
as well since it manages closeable resource (like Translog
), wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a good point however there are multiple places where the Engine is consuming the underlying Translog
directly, which I plan on getting rid of in subsequent PRs as a part of moving Translog to a module(this PR would become too big to review otherwise).
OpenSearch/server/src/main/java/org/opensearch/index/translog/TranslogManager.java
Line 102 in 304d830
Translog getTranslog(boolean ensureOpen); |
Once complete I will make the suggested change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #3709
@Override | ||
public IndexResult index(Index index) throws IOException { | ||
ensureOpen(); | ||
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); | ||
final Translog.Location location = translog.add(new Translog.Index(index, indexResult)); | ||
final Translog.Location location = translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was not clear to me at the first place why would getTranslog
use boolean argument to check if engine is open or not: in all flows this value is set to false
(please correct me if I am wrong). It seems like only tests use true
as an argument (which could use different assertion for that). I would suggested to clean up interface by removing this confusing parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @reta will make the change, let me know if you have more comments and I can try address them in the next revision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Bukhtawar I went through a few times, I have nothing else to comment on
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
start gradle check |
@@ -93,7 +93,7 @@ public boolean shouldRollTranslogGeneration() { | |||
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} | |||
|
|||
@Override | |||
public Translog getTranslog(boolean ensureOpen) { | |||
public Translog getTranslog() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI (new change set), the NoOpTranslogManager
is effectively unusable since every single caller assumes it will never return null
for Translog getTranslog()
. Hopefully, NoOpTranslogManager
will never be used but you may think about introducing NoOpTranslog
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct the NoOpTranslog
is already being worked upon as a part of #3600.
start gradle check |
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this! It's great to see the decoupling work begin. I gave it a quick review. I noticed a lot of this is cosmetic so I just have some validating questions.
One general question. Is the point of having TranslogManager
as an interface because we want to be able to create other translog managers (e.g., kafka manager)?
@@ -167,6 +168,8 @@ public final EngineConfig config() { | |||
return engineConfig; | |||
} | |||
|
|||
public abstract TranslogManager translogManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 Great to see this decoupling
@@ -308,7 +334,9 @@ public InternalEngine(EngineConfig engineConfig) { | |||
} | |||
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); | |||
this.internalReaderManager.addListener(lastRefreshedCheckpointListener); | |||
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); | |||
maxSeqNoOfUpdatesOrDeletes = new AtomicLong( | |||
SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog().getMaxSeqNo()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big fan of this indirection. I think refactoring utility methods like getMaxSeqNo
directly to the TranslogManager
instead of invoking getTranslog
everywhere is cleaner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I just noticed the previous review comments. Looks like you're planning to decouple this further in follow up PRs. So disregard.
@@ -759,20 +761,20 @@ public long getProcessedCheckpoint() { | |||
} | |||
|
|||
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { | |||
engine.ensureCanFlush(); // recovered already | |||
engine.translogManager().ensureCanFlush(); // recovered already |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This this indirection to translogManager() temporary and planned to be cleaned up in the follow on PRs?
Thanks @nknize |
Hi @Bukhtawar |
so, yes? 😄
I presume this will create a new I'm trying to glean from this incremental PR if the implementation design is to have 1:1 concrete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have only a nit, lgtm.
|
||
@Override | ||
public void onTragicFailure(AlreadyClosedException ex) { | ||
failOnTragicEvent(ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nit - I'm wondering if we should leave it up to the engine to determine if the error is tragic or not instead of explicitly catching AlreadyClosedException
in InternalTranslogManager
? Then we don't need this separation in TranslogEventListener
.
The idea is definitely to support streaming stores like kafka and other blob stores to start with. At this point honestly, I am still contemplating if we should introduce a new low level abstraction or continue with TranslogManger as the abstraction itself. I'll be shortly putting forth a proposal and open up a discussion /cc @nknize |
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Will continue discussion over #3242. Merging it as all feedbacks incorporated |
…ct#3671) * Integrate Engine with decoupled translog interface Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Bukhtawar Khan bukhtawa@amazon.com
Description
The PR aims to Integrate Engine with decoupled Translog interfaces introduced in #3638 . This is the second part of introducing new interfaces and implementation as a part of decoupling. The overall proposal can be reviewed at #3471
Breaks down the main decouple PR into #3471 Pluggable translog work
Issues Resolved
#3241
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.