diff --git a/internal/admin/mrmanager/manager.go b/internal/admin/mrmanager/manager.go index c3bb60154..899971816 100644 --- a/internal/admin/mrmanager/manager.go +++ b/internal/admin/mrmanager/manager.go @@ -170,10 +170,12 @@ func (mrm *mrManager) clusterMetadata(ctx context.Context) (*varlogpb.MetadataDe return meta, err } -func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) error { +func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -182,7 +184,8 @@ func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta * return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.RegisterStorageNode(ctx, storageNodeMeta); err != nil { + err = cli.RegisterStorageNode(ctx, storageNodeMeta) + if err != nil { _ = cli.Close() return err } @@ -190,10 +193,12 @@ func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta * return err } -func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) error { +func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -202,7 +207,8 @@ func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID t return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.UnregisterStorageNode(ctx, storageNodeID); err != nil { + err = cli.UnregisterStorageNode(ctx, storageNodeID) + if err != nil { _ = cli.Close() return err } @@ -210,10 +216,12 @@ func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID t return err } -func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) error { +func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -222,7 +230,8 @@ func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.RegisterTopic(ctx, topicID); err != nil { + err = cli.RegisterTopic(ctx, topicID) + if err != nil { _ = cli.Close() return err } @@ -230,10 +239,12 @@ func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) return err } -func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) error { +func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -242,7 +253,8 @@ func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.UnregisterTopic(ctx, topicID); err != nil { + err = cli.UnregisterTopic(ctx, topicID) + if err != nil { _ = cli.Close() return err } @@ -250,10 +262,12 @@ func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID return err } -func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error { +func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -262,17 +276,20 @@ func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varl return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.RegisterLogStream(ctx, logStreamDesc); err != nil { + err = cli.RegisterLogStream(ctx, logStreamDesc) + if err != nil { _ = cli.Close() return err } return nil } -func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) error { +func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -281,17 +298,20 @@ func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.UnregisterLogStream(ctx, logStreamID); err != nil { + err = cli.UnregisterLogStream(ctx, logStreamID) + if err != nil { _ = cli.Close() return err } return err } -func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error { +func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -300,7 +320,8 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.UpdateLogStream(ctx, logStreamDesc); err != nil { + err = cli.UpdateLogStream(ctx, logStreamDesc) + if err != nil { _ = cli.Close() return err } @@ -311,7 +332,9 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (lastCommittedGLSN types.GLSN, err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -327,10 +350,12 @@ func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) ( return lastCommittedGLSN, err } -func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) error { +func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) (err error) { mrm.mu.Lock() defer func() { - mrm.dirty = true + if err == nil { + mrm.dirty = true + } mrm.mu.Unlock() }() @@ -339,7 +364,8 @@ func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) return errors.WithMessage(err, "mrmanager: not accessible") } - if err := cli.Unseal(ctx, logStreamID); err != nil { + err = cli.Unseal(ctx, logStreamID) + if err != nil { _ = cli.Close() return err } @@ -347,8 +373,8 @@ func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) } func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, error) { - mrm.mu.Lock() - defer mrm.mu.Unlock() + mrm.mu.RLock() + defer mrm.mu.RUnlock() cli, err := mrm.mc() if err != nil {