diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 868a2c53287..ad173695099 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -26,7 +26,7 @@ import ( // NewLeaderParticipation is part of the topo.Server interface func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { - if c.closed { + if c.closed.Load() { return nil, ErrConnectionClosed } @@ -72,7 +72,7 @@ type cLeaderParticipation struct { // WaitForLeadership is part of the topo.LeaderParticipation interface. func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } @@ -120,7 +120,7 @@ func (mp *cLeaderParticipation) Stop() { // GetCurrentLeaderID is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return "", ErrConnectionClosed } @@ -139,7 +139,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, // WaitForNewLeader is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index c15fb9099bb..5c2a2462495 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -112,7 +112,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error { } func (c *Conn) unlock(ctx context.Context, dirPath string) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 504f1d4bd39..c9967c8863c 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -25,6 +25,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "time" "vitess.io/vitess/go/vt/log" @@ -124,14 +125,14 @@ type Conn struct { factory *Factory cell string serverAddr string - closed bool + closed atomic.Bool } // dial returns immediately, unless the Conn points to the sentinel // UnreachableServerAddr, in which case it will block until the context expires // and return the context's error. func (c *Conn) dial(ctx context.Context) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } if c.serverAddr == UnreachableServerAddr { @@ -144,7 +145,7 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { - c.closed = true + c.closed.Store(true) } type watch struct { diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 73b2d248434..0f245c95b5f 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -25,7 +25,7 @@ import ( // Watch is part of the topo.Conn interface. func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { - if c.closed { + if c.closed.Load() { return nil, nil, ErrConnectionClosed } @@ -75,7 +75,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c // WatchRecursive is part of the topo.Conn interface. func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { - if c.closed { + if c.closed.Load() { return nil, nil, ErrConnectionClosed }