From bd7df0236573813644b9f933c206d448d5416337 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Mon, 31 Jul 2023 17:27:08 +0900 Subject: [PATCH] refactor(admin): prevent the large lock from mrmanager This PR changes the role of `internal/admin/mrmanager.(*mrManager).mu`. The large lock in the mrmanager made the mrmanager run sequentially. However, from now, it only guards the dirty flag for the cluster metadata. Since the metadata repository works concurrently, adopting a large lock on the client side is unnecessary. --- internal/admin/mrmanager/manager.go | 151 ++++++++-------------------- 1 file changed, 44 insertions(+), 107 deletions(-) diff --git a/internal/admin/mrmanager/manager.go b/internal/admin/mrmanager/manager.go index 899971816..c124327b2 100644 --- a/internal/admin/mrmanager/manager.go +++ b/internal/admin/mrmanager/manager.go @@ -85,14 +85,14 @@ var ( type mrManager struct { config - - mu sync.RWMutex connector mrconnector.Connector - - sfg singleflight.Group + sfg singleflight.Group + // mu guards two fields - dirty and updated. + mu sync.RWMutex dirty bool updated time.Time - meta *varlogpb.MetadataDescriptor + // meta is a cached metadata descriptor. + meta *varlogpb.MetadataDescriptor } const ( @@ -145,9 +145,6 @@ func (mrm *mrManager) mc() (mrc.MetadataRepositoryManagementClient, error) { } func (mrm *mrManager) Close() error { - mrm.mu.Lock() - defer mrm.mu.Unlock() - return errors.Wrap(mrm.connector.Close(), "mrmanager") } @@ -171,14 +168,6 @@ func (mrm *mrManager) clusterMetadata(ctx context.Context) (*varlogpb.MetadataDe } func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -190,18 +179,11 @@ func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta * return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -213,18 +195,11 @@ func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID t return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -236,18 +211,11 @@ func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -259,18 +227,11 @@ func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -281,18 +242,12 @@ func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varl _ = cli.Close() return err } + + mrm.setDirty() return nil } -func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -303,18 +258,12 @@ func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types _ = cli.Close() return err } - return err -} -func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() + mrm.setDirty() + return nil +} +func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -325,40 +274,29 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog _ = cli.Close() return err } - return err + + mrm.setDirty() + return nil } // It implements MetadataRepositoryManager.Seal method. -func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (lastCommittedGLSN types.GLSN, err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (types.GLSN, error) { cli, err := mrm.c() if err != nil { return types.InvalidGLSN, errors.WithMessage(err, "mrmanager: not accessible") } - if lastCommittedGLSN, err = cli.Seal(ctx, logStreamID); err != nil { + lastCommittedGLSN, err := cli.Seal(ctx, logStreamID) + if err != nil { _ = cli.Close() return types.InvalidGLSN, err } - return lastCommittedGLSN, err -} -func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() + mrm.setDirty() + return lastCommittedGLSN, nil +} +func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -369,13 +307,12 @@ func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) _ = cli.Close() return err } - return err + + mrm.setDirty() + return nil } func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, error) { - mrm.mu.RLock() - defer mrm.mu.RUnlock() - cli, err := mrm.mc() if err != nil { return nil, errors.WithMessage(err, "mrmanager: not accessible") @@ -386,13 +323,10 @@ func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, er _ = cli.Close() return nil, err } - return rsp.GetClusterInfo(), err + return rsp.GetClusterInfo(), nil } func (mrm *mrManager) AddPeer(ctx context.Context, nodeID types.NodeID, peerURL, rpcURL string) error { - mrm.mu.Lock() - defer mrm.mu.Unlock() - cli, err := mrm.mc() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -411,9 +345,6 @@ func (mrm *mrManager) AddPeer(ctx context.Context, nodeID types.NodeID, peerURL, } func (mrm *mrManager) RemovePeer(ctx context.Context, nodeID types.NodeID) error { - mrm.mu.Lock() - defer mrm.mu.Unlock() - cli, err := mrm.mc() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -473,3 +404,9 @@ func (mrm *mrManager) StorageNode(ctx context.Context, storageNodeID types.Stora func (mrm *mrManager) NumberOfMR() int { return mrm.connector.NumberOfMR() } + +func (mrm *mrManager) setDirty() { + mrm.mu.Lock() + mrm.dirty = true + mrm.mu.Unlock() +}