Skip to content

Commit

Permalink
Utilize typed atomics (#1407)
Browse files Browse the repository at this point in the history
* Utilize atomic.Bool

* Use atomic.Pointer

* Address comments
  • Loading branch information
Stefan-Ethernal committed Apr 25, 2023
1 parent 6cb9d24 commit 93ac4d0
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 87 deletions.
18 changes: 4 additions & 14 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ type Blockchain struct {
// any new fields from being added
receiptsCache *lru.Cache // LRU cache for the block receipts

currentHeader atomic.Value // The current header
currentDifficulty atomic.Value // The current difficulty of the chain (total difficulty)
currentHeader atomic.Pointer[types.Header] // The current header
currentDifficulty atomic.Pointer[big.Int] // The current difficulty of the chain (total difficulty)

stream *eventStream // Event subscriptions

Expand Down Expand Up @@ -312,22 +312,12 @@ func (b *Blockchain) setCurrentHeader(h *types.Header, diff *big.Int) {

// Header returns the current header (atomic)
func (b *Blockchain) Header() *types.Header {
header, ok := b.currentHeader.Load().(*types.Header)
if !ok {
return nil
}

return header
return b.currentHeader.Load()
}

// CurrentTD returns the current total difficulty (atomic)
func (b *Blockchain) CurrentTD() *big.Int {
td, ok := b.currentDifficulty.Load().(*big.Int)
if !ok {
return nil
}

return td
return b.currentDifficulty.Load()
}

// Config returns the blockchain configuration
Expand Down
10 changes: 3 additions & 7 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type consensusRuntime struct {
lastBuiltBlock *types.Header

// activeValidatorFlag indicates whether the given node is amongst currently active validator set
activeValidatorFlag uint32
activeValidatorFlag atomic.Bool

// checkpointManager represents abstraction for checkpoint submission
checkpointManager CheckpointManager
Expand Down Expand Up @@ -557,16 +557,12 @@ func (c *consensusRuntime) GetStateSyncProof(stateSyncID uint64) (types.Proof, e

// setIsActiveValidator updates the activeValidatorFlag field
func (c *consensusRuntime) setIsActiveValidator(isActiveValidator bool) {
if isActiveValidator {
atomic.StoreUint32(&c.activeValidatorFlag, 1)
} else {
atomic.StoreUint32(&c.activeValidatorFlag, 0)
}
c.activeValidatorFlag.Store(isActiveValidator)
}

// isActiveValidator indicates if node is in validator set or not
func (c *consensusRuntime) isActiveValidator() bool {
return atomic.LoadUint32(&c.activeValidatorFlag) == 1
return c.activeValidatorFlag.Load()
}

// isFixedSizeOfEpochMet checks if epoch reached its end that was configured by its default size
Expand Down
20 changes: 10 additions & 10 deletions consensus/polybft/consensus_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,15 @@ func TestConsensusRuntime_FSM_NotInValidatorSet(t *testing.T) {
Key: createTestKey(t),
}
runtime := &consensusRuntime{
proposerCalculator: NewProposerCalculatorFromSnapshot(snapshot, config, hclog.NewNullLogger()),
activeValidatorFlag: 1,
config: config,
proposerCalculator: NewProposerCalculatorFromSnapshot(snapshot, config, hclog.NewNullLogger()),
config: config,
epoch: &epochMetadata{
Number: 1,
Validators: validators.getPublicIdentities(),
},
lastBuiltBlock: &types.Header{},
}
runtime.setIsActiveValidator(true)

err := runtime.FSM()
assert.ErrorIs(t, err, errNotAValidator)
Expand Down Expand Up @@ -340,10 +340,9 @@ func TestConsensusRuntime_FSM_NotEndOfEpoch_NotEndOfSprint(t *testing.T) {
blockchain: blockchainMock,
}
runtime := &consensusRuntime{
proposerCalculator: NewProposerCalculatorFromSnapshot(snapshot, config, hclog.NewNullLogger()),
logger: hclog.NewNullLogger(),
activeValidatorFlag: 1,
config: config,
proposerCalculator: NewProposerCalculatorFromSnapshot(snapshot, config, hclog.NewNullLogger()),
logger: hclog.NewNullLogger(),
config: config,
epoch: &epochMetadata{
Number: 1,
Validators: validators.getPublicIdentities(),
Expand All @@ -354,6 +353,7 @@ func TestConsensusRuntime_FSM_NotEndOfEpoch_NotEndOfSprint(t *testing.T) {
stateSyncManager: &dummyStateSyncManager{},
checkpointManager: &dummyCheckpointManager{},
}
runtime.setIsActiveValidator(true)

err := runtime.FSM()
require.NoError(t, err)
Expand Down Expand Up @@ -512,9 +512,8 @@ func TestConsensusRuntime_restartEpoch_SameEpochNumberAsTheLastOne(t *testing.T)
blockchain: blockchainMock,
}
runtime := &consensusRuntime{
proposerCalculator: NewProposerCalculatorFromSnapshot(snapshot, config, hclog.NewNullLogger()),
activeValidatorFlag: 1,
config: config,
proposerCalculator: NewProposerCalculatorFromSnapshot(snapshot, config, hclog.NewNullLogger()),
config: config,
epoch: &epochMetadata{
Number: 1,
Validators: validatorSet,
Expand All @@ -524,6 +523,7 @@ func TestConsensusRuntime_restartEpoch_SameEpochNumberAsTheLastOne(t *testing.T)
Number: originalBlockNumber,
},
}
runtime.setIsActiveValidator(true)

epoch, err := runtime.restartEpoch(newCurrentHeader)

Expand Down
6 changes: 3 additions & 3 deletions e2e-polybft/framework/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type node struct {
shuttingDown uint64
shuttingDown atomic.Bool
cmd *exec.Cmd
doneCh chan struct{}
exitResult *exitResult
Expand Down Expand Up @@ -52,7 +52,7 @@ func (n *node) run() {
}

func (n *node) IsShuttingDown() bool {
return atomic.LoadUint64(&n.shuttingDown) == 1
return n.shuttingDown.Load()
}

func (n *node) Stop() error {
Expand All @@ -65,7 +65,7 @@ func (n *node) Stop() error {
return err
}

atomic.StoreUint64(&n.shuttingDown, 1)
n.shuttingDown.Store(true)
<-n.Wait()

return nil
Expand Down
8 changes: 4 additions & 4 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Topic struct {
topic *pubsub.Topic
typ reflect.Type
closeCh chan struct{}
closed *uint64
closed atomic.Bool
waitGroup sync.WaitGroup
}

Expand All @@ -40,7 +40,7 @@ func (t *Topic) createObj() proto.Message {
}

func (t *Topic) Close() {
if atomic.SwapUint64(t.closed, 1) > 0 {
if t.closed.Swap(true) {
// Already closed.
return
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (t *Topic) Subscribe(handler func(obj interface{}, from peer.ID)) error {
}

// Mark topic active.
atomic.StoreUint64(t.closed, 0)
t.closed.Store(false)

go t.readLoop(sub, handler)

Expand Down Expand Up @@ -126,8 +126,8 @@ func (s *Server) NewTopic(protoID string, obj proto.Message) (*Topic, error) {
topic: topic,
typ: reflect.TypeOf(obj).Elem(),
closeCh: make(chan struct{}),
closed: new(uint64),
}
tt.closed.Store(false)

return tt, nil
}
1 change: 0 additions & 1 deletion network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func TestSimpleGossip(t *testing.T) {
func Test_RepeatedClose(t *testing.T) {
topic := &Topic{
closeCh: make(chan struct{}),
closed: new(uint64),
}

// Call Close() twice to ensure that underlying logic (e.g. channel close) is
Expand Down
7 changes: 3 additions & 4 deletions syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type syncPeerClient struct {

shouldEmitBlocks bool // flag for emitting blocks in the topic
closeCh chan struct{}
closed *uint64 // ACTIVE == 0, CLOSED == non-zero.
closed atomic.Bool

peerStatusUpdateChLock sync.Mutex
peerStatusUpdateChClosed bool
Expand All @@ -58,7 +58,6 @@ func NewSyncPeerClient(
peerConnectionUpdateCh: make(chan *event.PeerEvent, 1),
shouldEmitBlocks: true,
closeCh: make(chan struct{}),
closed: new(uint64),

peerStatusUpdateChLock: sync.Mutex{},
peerStatusUpdateChClosed: false,
Expand All @@ -68,7 +67,7 @@ func NewSyncPeerClient(
// Start processes for SyncPeerClient
func (m *syncPeerClient) Start() error {
// Mark client active.
atomic.StoreUint64(m.closed, 0)
m.closed.Store(false)

go m.startNewBlockProcess()
go m.startPeerEventProcess()
Expand All @@ -82,7 +81,7 @@ func (m *syncPeerClient) Start() error {

// Close terminates running processes for SyncPeerClient
func (m *syncPeerClient) Close() {
if atomic.SwapUint64(m.closed, 1) > 0 {
if m.closed.Swap(true) {
// Already closed.
return
}
Expand Down
3 changes: 2 additions & 1 deletion syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func newTestSyncPeerClient(network Network, blockchain Blockchain) *syncPeerClie
id: network.AddrInfo().ID.String(),
peerStatusUpdateCh: make(chan *NoForkPeer, 1),
peerConnectionUpdateCh: make(chan *event.PeerEvent, 1),
closed: new(uint64),
}

// need to register protocol
Expand Down Expand Up @@ -577,10 +576,12 @@ func Test_EmitMultipleBlocks(t *testing.T) {

waitForGossip := func(wg *sync.WaitGroup) bool {
c := make(chan struct{})

go func() {
defer close(c)
wg.Wait()
}()

select {
case <-c:
return true
Expand Down
13 changes: 6 additions & 7 deletions txpool/queue_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// All methods assume the (correct) lock is held.
type accountQueue struct {
sync.RWMutex
wLock uint32
wLock atomic.Bool
queue minNonceQueue
}

Expand All @@ -27,18 +27,17 @@ func newAccountQueue() *accountQueue {
}

func (q *accountQueue) lock(write bool) {
switch write {
case true:
if write {
q.Lock()
atomic.StoreUint32(&q.wLock, 1)
case false:
} else {
q.RLock()
atomic.StoreUint32(&q.wLock, 0)
}

q.wLock.Store(write)
}

func (q *accountQueue) unlock() {
if atomic.SwapUint32(&q.wLock, 0) == 1 {
if q.wLock.Swap(false) {
q.Unlock()
} else {
q.RUnlock()
Expand Down
22 changes: 4 additions & 18 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ type TxPool struct {

// flag indicating if the current node is a sealer,
// and should therefore gossip transactions
sealing uint32
sealing atomic.Bool

// baseFee is the base fee of the current head.
// This is needed to sort transactions by price
Expand Down Expand Up @@ -338,21 +338,7 @@ func (p *TxPool) SetSigner(s signer) {

// SetSealing sets the sealing flag
func (p *TxPool) SetSealing(sealing bool) {
newValue := uint32(0)
if sealing {
newValue = 1
}

atomic.CompareAndSwapUint32(
&p.sealing,
p.sealing,
newValue,
)
}

// sealing returns the current set sealing flag
func (p *TxPool) getSealing() bool {
return atomic.LoadUint32(&p.sealing) == 1
p.sealing.CompareAndSwap(p.sealing.Load(), sealing)
}

// AddTx adds a new transaction to the pool (sent from json-RPC/gRPC endpoints)
Expand Down Expand Up @@ -576,7 +562,7 @@ func (p *TxPool) processEvent(event *blockchain.Event) {
// reset accounts with the new state
p.resetAccounts(stateNonces)

if !p.getSealing() {
if !p.sealing.Load() {
// only non-validator cleanup inactive accounts
p.updateAccountSkipsCounts(stateNonces)
}
Expand Down Expand Up @@ -843,7 +829,7 @@ func (p *TxPool) handlePromoteRequest(req promoteRequest) {
// addGossipTx handles receiving transactions
// gossiped by the network.
func (p *TxPool) addGossipTx(obj interface{}, _ peer.ID) {
if !p.getSealing() {
if !p.sealing.Load() {
return
}

Expand Down
6 changes: 3 additions & 3 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2836,9 +2836,9 @@ func TestSetSealing(t *testing.T) {
assert.NoError(t, err)

// Set initial value
pool.sealing = 0
pool.sealing.Store(false)
if test.initialValue {
pool.sealing = 1
pool.sealing.Store(true)
}

// call the target
Expand All @@ -2848,7 +2848,7 @@ func TestSetSealing(t *testing.T) {
assert.Equal(
t,
test.expectedValue,
pool.getSealing(),
pool.sealing.Load(),
)
})
}
Expand Down
9 changes: 2 additions & 7 deletions types/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type Block struct {
Uncles []*Header

// Cache
size atomic.Value // *uint64
size atomic.Pointer[uint64]
}

func (b *Block) Hash() Hash {
Expand Down Expand Up @@ -138,12 +138,7 @@ func (b *Block) Size() uint64 {
return size
}

sizeVal, ok := sizePtr.(*uint64)
if !ok {
return 0
}

return *sizeVal
return *sizePtr
}

func (b *Block) String() string {
Expand Down
Loading

0 comments on commit 93ac4d0

Please sign in to comment.