-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: mpool: Implement transactionLk to ensure consistent state access in MessagePool #10865
base: master
Are you sure you want to change the base?
Conversation
…formance Description: This commit introduces significant refinements to the locking mechanism in the MessagePool. The primary objective is to reduce the duration that locks impede RPC calls from accessing data, thereby enhancing the overall performance of various MessagePool operations. Key Changes include: 1. Introduction of a new lock, transactionLk. This lock plays a crucial role in improving concurrency control in the MessagePool. It works in conjunction with the existing lock (previously known as curTsLk) to ensure proper synchronization when accessing both pending messages and current tipset related data. 2. The existing lock, curTsLk, has been renamed to stateLk, to better reflect its purpose. This lock's usage has been refined to optimize its performance. 3. Locking and unlocking procedures have been updated in several functions, including but not limited to CheckMessages, CheckPendingMessages, CheckReplaceMessages, checkMessages, New, TryForEachPendingMessage, addTs, addLoaded, addSkipChecks, GetNonce, Pending, PendingFor, HeadChange, Clear, pruneExcessMessages, and republishPendingMessages. The integration of these changes aims to improve the efficiency of the MessagePool, particularly in terms of handling RPC calls, and to boost the overall performance of the system.
// | ||
// mp.stateLk.Lock() | ||
// defer mp.stateLk.Unlock() | ||
stateLk sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would simplify the code a lot IMO if we could merge the transactionLk/stateLk into one lock or lock object, is that something that you considered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@snissn What was the resolution to this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was commented on before @fridrik01 got further into reading the PR, and there was a subsequent comment from him clarifying. It is necessary to have two locks for this PR.
Co-authored-by: Friðrik Ásmundsson <fridrik01@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more questions after earlier review in #10833
@@ -449,8 +470,8 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra | |||
} | |||
|
|||
func (mp *MessagePool) ForEachPendingMessage(f func(cid.Cid) error) error { | |||
mp.lk.Lock() | |||
defer mp.lk.Unlock() | |||
mp.transactionLk.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's sufficient to Lock mp.stateLk here, since we just need pending
to not change under our feet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transaction lock needs to be used to safely make changes to state. So changes to the state must be wrapped in a transaction lock. Hope that makes sense! Also the protector is who calls this function so I am pretty sure it wants write access not read access, so i used a write lock here
defer mp.transactionLk.Unlock() | ||
|
||
mp.stateLk.RLock() | ||
err := mp.checkMessage(ctx, m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can refactor checkMessage
to not need to reference curTs
at all -- I can push up a commit that does that, and I think in that case we can change this to only take the stateLk
after having called checkMessage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds great to me! Have you gotten to that yet? Should we set up a new ticket for that?
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) { | ||
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, local, untrusted bool) (bool, error) { | ||
//ensures that we have a consistent view of the state | ||
mp.transactionLk.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not immediately clear to me why we need the transactionLk
here, and in some of the subsequent methods, since we aren't modifying curTs
. I can totally believe it's necessary, but can you please talk me through it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey! That's a good question! When the tip set changes, the message pool is pruned to remove any messages that were in the blocks:
// caller must hold transactionLk and stateLk
func (mp *MessagePool) headChange(ctx context.Context, revert []*types.TipSet, apply []*types.TipSet) error {
repubTrigger := false
rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage)
add := func(m *types.SignedMessage) {
s, ok := rmsgs[m.Message.From]
if !ok {
s = make(map[uint64]*types.SignedMessage)
rmsgs[m.Message.From] = s
}
s[m.Message.Nonce] = m
}
rm := func(from address.Address, nonce uint64) {
s, ok := rmsgs[from]
if !ok {
mp.remove(ctx, from, nonce, true)
return
}
if _, ok := s[nonce]; ok {
delete(s, nonce)
return
}
mp.remove(ctx, from, nonce, true)
}
...
for _, ts := range apply {
mp.curTs = ts
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.api.MessagesForBlock(ctx, b)
if err != nil {
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
log.Errorf("error retrieving messages for block: %s", xerr)
merr = multierror.Append(merr, xerr)
continue
}
for _, msg := range smsgs {
rm(msg.Message.From, msg.Message.Nonce)
maybeRepub(msg.Cid())
}
for _, msg := range bmsgs {
rm(msg.From, msg.Nonce)
maybeRepub(msg.Cid())
}
}
}
we want to wrap changes to tipset and/or messages in a pair of transaction and state locks because changing the message pool requires a coordination lock on the tipset because the tipset changes the message pool
mp.curTsLk.Lock() | ||
ts := mp.curTs | ||
mp.curTsLk.Unlock() | ||
mp.transactionLk.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not clear to me that we need transactionLk
here, but unsure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above - we are changing the message pool so we want to use the transaction and state locks because we are changing either the tipset or the messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this does simplify things, it also significantly increases the granularity of some of the locks. Are we sure it improves performance?
mp.transactionLk.Lock() | ||
defer mp.transactionLk.Unlock() | ||
|
||
mp.stateLk.Lock() | ||
defer mp.stateLk.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will block sync entirely waiting on both of these locks. Is that fine?
mp.stateLk.RLock() | ||
defer mp.stateLk.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This locks a much wider range. Are we sure that's a good idea?
Title: Refine MessagePool Locking Mechanism to Optimize RPC Calls Performance
Description:
This commit introduces significant refinements to the locking mechanism in the MessagePool. The primary objective is to reduce the duration that locks impede RPC calls from accessing data, thereby enhancing the overall performance of various MessagePool operations.
Key Changes include:
Introduction of a new lock, transactionLk. This lock plays a crucial role in improving concurrency control in the MessagePool. It works in conjunction with the existing lock (previously known as curTsLk) to ensure proper synchronization when accessing both pending messages and current tipset related data.
The existing lock, curTsLk, has been renamed to stateLk, to better reflect its purpose. This lock's usage has been refined to optimize its performance.
Locking and unlocking procedures have been updated in several functions, including but not limited to CheckMessages, CheckPendingMessages, CheckReplaceMessages, checkMessages, New, TryForEachPendingMessage, addTs, addLoaded, addSkipChecks, GetNonce, Pending, PendingFor, HeadChange, Clear, pruneExcessMessages, and republishPendingMessages.
The integration of these changes aims to improve the efficiency of the MessagePool, particularly in terms of handling RPC calls, and to boost the overall performance of the system.
Related Issues
squashed from #10833
Proposed Changes
Additional Info
Checklist
Before you mark the PR ready for review, please make sure that:
<PR type>: <area>: <change being made>
fix: mempool: Introduce a cache for valid signatures
PR type
: fix, feat, build, chore, ci, docs, perf, refactor, revert, style, testarea
, e.g. api, chain, state, market, mempool, multisig, networking, paych, proving, sealing, wallet, deps