Skip to content

Commit

Permalink
chore(lib/common) implement byte pool to improve websocket subscripti…
Browse files Browse the repository at this point in the history
…on efficiency (ChainSafe#1693)

* implement bufferpool

* implement byte pool for subscription setup

* lint

* fix lint

* address comments

* refactor UnregisterFinalizedChannel to UnregisterFinalisedChannel

* lint

* change error handling

* refactor NumPooled to Len
  • Loading branch information
edwardmack authored and timwu20 committed Dec 6, 2021
1 parent 281eef2 commit 0639451
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 45 deletions.
2 changes: 1 addition & 1 deletion dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type BlockState interface {
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
UnregisterFinalisedChannel(id byte)
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
SubChain(start, end common.Hash) ([]common.Hash, error)
GetBlockBody(hash common.Hash) (*types.Body, error)
Expand Down
2 changes: 1 addition & 1 deletion dot/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (h *Handler) Start() error {
func (h *Handler) Stop() error {
h.cancel()
h.blockState.UnregisterImportedChannel(h.importedID)
h.blockState.UnregisterFinalizedChannel(h.finalisedID)
h.blockState.UnregisterFinalisedChannel(h.finalisedID)
close(h.imported)
close(h.finalised)
return nil
Expand Down
2 changes: 1 addition & 1 deletion dot/digest/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type BlockState interface {
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
UnregisterFinalisedChannel(id byte)
}

// EpochState is the interface for state.EpochState
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type BlockAPI interface {
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
UnregisterFinalisedChannel(id byte)
SubChain(start, end common.Hash) ([]common.Hash, error)
}

Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/modules/mocks/block_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ type BlockState struct {
lastFinalised common.Hash

// block notifiers
imported map[byte]chan<- *types.Block
finalised map[byte]chan<- *types.FinalisationInfo
importedLock sync.RWMutex
finalisedLock sync.RWMutex
imported map[byte]chan<- *types.Block
finalised map[byte]chan<- *types.FinalisationInfo
importedLock sync.RWMutex
finalisedLock sync.RWMutex
importedBytePool *common.BytePool
finalisedBytePool *common.BytePool

pruneKeyCh chan *types.Header
}
Expand Down Expand Up @@ -80,6 +82,10 @@ func NewBlockState(db chaindb.Database, bt *blocktree.BlockTree) (*BlockState, e
return nil, fmt.Errorf("failed to get last finalised hash: %w", err)
}

bs.importedBytePool = common.NewBytePool256()

bs.finalisedBytePool = common.NewBytePool256()

return bs, nil
}

Expand Down Expand Up @@ -117,6 +123,10 @@ func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header) (*Block
return nil, err
}

bs.importedBytePool = common.NewBytePool256()

bs.finalisedBytePool = common.NewBytePool256()

return bs, nil
}

Expand Down
47 changes: 16 additions & 31 deletions dot/state/block_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package state

import (
"errors"
"math/rand"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
)
Expand All @@ -29,16 +26,9 @@ import (
func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, error) {
bs.importedLock.RLock()

if len(bs.imported) == 256 {
return 0, errors.New("channel limit reached")
}

var id byte
for {
id = generateID()
if bs.imported[id] == nil {
break
}
id, err := bs.importedBytePool.Get()
if err != nil {
return 0, err
}

bs.importedLock.RUnlock()
Expand All @@ -54,16 +44,9 @@ func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, err
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
bs.finalisedLock.RLock()

if len(bs.finalised) == 256 {
return 0, errors.New("channel limit reached")
}

var id byte
for {
id = generateID()
if bs.finalised[id] == nil {
break
}
id, err := bs.finalisedBytePool.Get()
if err != nil {
return 0, err
}

bs.finalisedLock.RUnlock()
Expand All @@ -81,15 +64,23 @@ func (bs *BlockState) UnregisterImportedChannel(id byte) {
defer bs.importedLock.Unlock()

delete(bs.imported, id)
err := bs.importedBytePool.Put(id)
if err != nil {
logger.Error("failed to unregister imported channel", "error", err)
}
}

// UnregisterFinalizedChannel removes the block finalisation notification channel with the given ID.
// UnregisterFinalisedChannel removes the block finalisation notification channel with the given ID.
// A channel must be unregistered before closing it.
func (bs *BlockState) UnregisterFinalizedChannel(id byte) {
func (bs *BlockState) UnregisterFinalisedChannel(id byte) {
bs.finalisedLock.Lock()
defer bs.finalisedLock.Unlock()

delete(bs.finalised, id)
err := bs.finalisedBytePool.Put(id)
if err != nil {
logger.Error("failed to unregister finalised channel", "error", err)
}
}

func (bs *BlockState) notifyImported(block *types.Block) {
Expand Down Expand Up @@ -141,9 +132,3 @@ func (bs *BlockState) notifyFinalized(hash common.Hash, round, setID uint64) {
}(ch)
}
}

func generateID() byte {
// skipcq: GSC-G404
id := rand.Intn(256) //nolint
return byte(id)
}
4 changes: 2 additions & 2 deletions dot/state/block_notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestFinalizedChannel(t *testing.T) {
id, err := bs.RegisterFinalizedChannel(ch)
require.NoError(t, err)

defer bs.UnregisterFinalizedChannel(id)
defer bs.UnregisterFinalisedChannel(id)

chain, _ := AddBlocksToState(t, bs, 3)

Expand Down Expand Up @@ -150,6 +150,6 @@ func TestFinalizedChannel_Multi(t *testing.T) {
wg.Wait()

for _, id := range ids {
bs.UnregisterFinalizedChannel(id)
bs.UnregisterFinalisedChannel(id)
}
}
66 changes: 66 additions & 0 deletions lib/common/bytepool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2019 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.

package common

import "fmt"

// BytePool struct to hold byte objects that will be contained in pool
type BytePool struct {
c chan byte
}

// NewBytePool256 creates and initialises pool with 256 entries
func NewBytePool256() *BytePool {
bp := NewBytePool(256)
for i := 0; i < 256; i++ {
_ = bp.Put(byte(i))
}
return bp
}

// NewBytePool creates a new empty byte pool with capacity of size
func NewBytePool(size int) (bp *BytePool) {
return &BytePool{
c: make(chan byte, size),
}
}

// Get gets a Buffer from the BytePool, or creates a new one if none are
// available in the pool.
func (bp *BytePool) Get() (b byte, err error) {
select {
case b = <-bp.c:
default:
err = fmt.Errorf("all slots used")
}
return
}

// Put returns the given Buffer to the BytePool.
func (bp *BytePool) Put(b byte) error {
select {
case bp.c <- b:
return nil
default:
return fmt.Errorf("pool is full")
}
}

// Len returns the number of items currently pooled.
func (bp *BytePool) Len() int {
return len(bp.c)
}
62 changes: 62 additions & 0 deletions lib/common/bytepool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.

package common

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestBytePool(t *testing.T) {
bp := NewBytePool(5)
require.Equal(t, 0, bp.Len())

for i := 0; i < 5; i++ {
err := bp.Put(generateID())
require.NoError(t, err)
}
err := bp.Put(generateID())
require.EqualError(t, err, "pool is full")
require.Equal(t, 5, bp.Len())

for i := 0; i < 5; i++ {
_, err := bp.Get() // nolint
require.NoError(t, err)
}
_, err = bp.Get()
require.EqualError(t, err, "all slots used")
}

func TestBytePool256(t *testing.T) {
bp := NewBytePool256()
require.Equal(t, 256, bp.Len())

for i := 0; i < 256; i++ {
_, err := bp.Get() // nolint
require.NoError(t, err)
}
_, err := bp.Get()
require.EqualError(t, err, "all slots used")
}

func generateID() byte {
// skipcq: GSC-G404
id := rand.Intn(256) //nolint
return byte(id)
}
2 changes: 1 addition & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *Service) Stop() error {

s.cancel()

s.blockState.UnregisterFinalizedChannel(s.finalisedChID)
s.blockState.UnregisterFinalisedChannel(s.finalisedChID)
close(s.finalisedCh)

if !s.authority {
Expand Down
2 changes: 1 addition & 1 deletion lib/grandpa/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type BlockState interface {
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
UnregisterFinalisedChannel(id byte)
SetJustification(hash common.Hash, data []byte) error
HasJustification(hash common.Hash) (bool, error)
GetJustification(hash common.Hash) ([]byte, error)
Expand Down

0 comments on commit 0639451

Please sign in to comment.