diff --git a/internal/admin/mrmanager/manager.go b/internal/admin/mrmanager/manager.go index 899971816..c124327b2 100644 --- a/internal/admin/mrmanager/manager.go +++ b/internal/admin/mrmanager/manager.go @@ -85,14 +85,14 @@ var ( type mrManager struct { config - - mu sync.RWMutex connector mrconnector.Connector - - sfg singleflight.Group + sfg singleflight.Group + // mu guards two fields - dirty and updated. + mu sync.RWMutex dirty bool updated time.Time - meta *varlogpb.MetadataDescriptor + // meta is a cached metadata descriptor. + meta *varlogpb.MetadataDescriptor } const ( @@ -145,9 +145,6 @@ func (mrm *mrManager) mc() (mrc.MetadataRepositoryManagementClient, error) { } func (mrm *mrManager) Close() error { - mrm.mu.Lock() - defer mrm.mu.Unlock() - return errors.Wrap(mrm.connector.Close(), "mrmanager") } @@ -171,14 +168,6 @@ func (mrm *mrManager) clusterMetadata(ctx context.Context) (*varlogpb.MetadataDe } func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -190,18 +179,11 @@ func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta * return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -213,18 +195,11 @@ func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID t return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -236,18 +211,11 @@ func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -259,18 +227,11 @@ func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID return err } - return err + mrm.setDirty() + return nil } -func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -281,18 +242,12 @@ func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varl _ = cli.Close() return err } + + mrm.setDirty() return nil } -func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -303,18 +258,12 @@ func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types _ = cli.Close() return err } - return err -} -func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() + mrm.setDirty() + return nil +} +func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -325,40 +274,29 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog _ = cli.Close() return err } - return err + + mrm.setDirty() + return nil } // It implements MetadataRepositoryManager.Seal method. -func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (lastCommittedGLSN types.GLSN, err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() - +func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (types.GLSN, error) { cli, err := mrm.c() if err != nil { return types.InvalidGLSN, errors.WithMessage(err, "mrmanager: not accessible") } - if lastCommittedGLSN, err = cli.Seal(ctx, logStreamID); err != nil { + lastCommittedGLSN, err := cli.Seal(ctx, logStreamID) + if err != nil { _ = cli.Close() return types.InvalidGLSN, err } - return lastCommittedGLSN, err -} -func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) (err error) { - mrm.mu.Lock() - defer func() { - if err == nil { - mrm.dirty = true - } - mrm.mu.Unlock() - }() + mrm.setDirty() + return lastCommittedGLSN, nil +} +func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) error { cli, err := mrm.c() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -369,13 +307,12 @@ func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) _ = cli.Close() return err } - return err + + mrm.setDirty() + return nil } func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, error) { - mrm.mu.RLock() - defer mrm.mu.RUnlock() - cli, err := mrm.mc() if err != nil { return nil, errors.WithMessage(err, "mrmanager: not accessible") @@ -386,13 +323,10 @@ func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, er _ = cli.Close() return nil, err } - return rsp.GetClusterInfo(), err + return rsp.GetClusterInfo(), nil } func (mrm *mrManager) AddPeer(ctx context.Context, nodeID types.NodeID, peerURL, rpcURL string) error { - mrm.mu.Lock() - defer mrm.mu.Unlock() - cli, err := mrm.mc() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -411,9 +345,6 @@ func (mrm *mrManager) AddPeer(ctx context.Context, nodeID types.NodeID, peerURL, } func (mrm *mrManager) RemovePeer(ctx context.Context, nodeID types.NodeID) error { - mrm.mu.Lock() - defer mrm.mu.Unlock() - cli, err := mrm.mc() if err != nil { return errors.WithMessage(err, "mrmanager: not accessible") @@ -473,3 +404,9 @@ func (mrm *mrManager) StorageNode(ctx context.Context, storageNodeID types.Stora func (mrm *mrManager) NumberOfMR() int { return mrm.connector.NumberOfMR() } + +func (mrm *mrManager) setDirty() { + mrm.mu.Lock() + mrm.dirty = true + mrm.mu.Unlock() +}