diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 52bbe2e93ce..0a76c202de2 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -28,7 +28,7 @@ import ( func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1) - if c.closed { + if c.closed.Load() { return nil, ErrConnectionClosed } @@ -74,7 +74,7 @@ type cLeaderParticipation struct { // WaitForLeadership is part of the topo.LeaderParticipation interface. func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } @@ -122,7 +122,7 @@ func (mp *cLeaderParticipation) Stop() { // GetCurrentLeaderID is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return "", ErrConnectionClosed } @@ -141,7 +141,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, // WaitForNewLeader is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index 0545ba8b182..afce7868469 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -116,7 +116,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error { } func (c *Conn) unlock(ctx context.Context, dirPath string) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 55057b62468..abf99e760c3 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -25,6 +25,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -134,13 +135,13 @@ type Conn struct { factory *Factory cell string serverAddr string - closed bool + closed atomic.Bool } // dial returns immediately, unless the Conn points to the sentinel // UnreachableServerAddr, in which case it will block until the context expires. func (c *Conn) dial(ctx context.Context) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } if c.serverAddr == UnreachableServerAddr { @@ -153,7 +154,7 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { c.factory.callstats.Add([]string{"Close"}, 1) - c.closed = true + c.closed.Store(true) } type watch struct { diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 8d9ef5cb54c..3651bcca9ce 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -27,7 +27,7 @@ import ( func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { c.factory.callstats.Add([]string{"Watch"}, 1) - if c.closed { + if c.closed.Load() { return nil, nil, ErrConnectionClosed } @@ -79,7 +79,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { c.factory.callstats.Add([]string{"WatchRecursive"}, 1) - if c.closed { + if c.closed.Load() { return nil, nil, ErrConnectionClosed }