From fd9b70d93cad7890205e728a5aa8a6ac130018f1 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 4 Feb 2022 12:05:29 -0400 Subject: [PATCH 1/2] fix(dot/network): memory improvement for network buffers (#2233) --- dot/network/inbound.go | 5 +- dot/network/notifications.go | 10 +-- dot/network/pool.go | 43 ------------- dot/network/pool_test.go | 114 ----------------------------------- dot/network/service.go | 16 ++--- 5 files changed, 14 insertions(+), 174 deletions(-) delete mode 100644 dot/network/pool.go delete mode 100644 dot/network/pool_test.go diff --git a/dot/network/inbound.go b/dot/network/inbound.go index 0af6e6c2ee..dce76de654 100644 --- a/dot/network/inbound.go +++ b/dot/network/inbound.go @@ -15,8 +15,9 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder s.streamManager.logNewStream(stream) peer := stream.Conn().RemotePeer() - msgBytes := s.bufPool.get() - defer s.bufPool.put(msgBytes) + buffer := s.bufPool.Get().(*[]byte) + defer s.bufPool.Put(buffer) + msgBytes := *buffer for { n, err := readStream(stream, msgBytes[:]) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index a3b0006557..c55b063186 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -427,11 +427,11 @@ func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDe hsC := make(chan *handshakeReader) go func() { - msgBytes := s.bufPool.get() - defer func() { - s.bufPool.put(msgBytes) - close(hsC) - }() + defer close(hsC) + + buffer := s.bufPool.Get().(*[]byte) + defer s.bufPool.Put(buffer) + msgBytes := *buffer tot, err := readStream(stream, msgBytes[:]) if err != nil { diff --git a/dot/network/pool.go b/dot/network/pool.go deleted file mode 100644 index c517acf88d..0000000000 --- a/dot/network/pool.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2021 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package network - -// sizedBufferPool is a pool of buffers used for reading from streams -type sizedBufferPool struct { - c chan []byte -} - -func newSizedBufferPool(preAllocate, size int) (bp *sizedBufferPool) { - bufferCh := make(chan []byte, size) - - for i := 0; i < preAllocate; i++ { - buf := make([]byte, maxMessageSize) - bufferCh <- buf - } - - return &sizedBufferPool{ - c: bufferCh, - } -} - -// get gets a buffer from the sizedBufferPool, or creates a new one if none are -// available in the pool. Buffers have a pre-allocated capacity. -func (bp *sizedBufferPool) get() (b []byte) { - select { - case b = <-bp.c: - // reuse existing buffer - return b - default: - // create new buffer - return make([]byte, maxMessageSize) - } -} - -// put returns the given buffer to the sizedBufferPool. -func (bp *sizedBufferPool) put(b []byte) { - select { - case bp.c <- b: - default: // Discard the buffer if the pool is full. - } -} diff --git a/dot/network/pool_test.go b/dot/network/pool_test.go deleted file mode 100644 index c0c37a597b..0000000000 --- a/dot/network/pool_test.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2021 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package network - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func Benchmark_sizedBufferPool(b *testing.B) { - const preAllocate = 100 - const poolSize = 200 - sbp := newSizedBufferPool(preAllocate, poolSize) - - b.RunParallel(func(p *testing.PB) { - for p.Next() { - buffer := sbp.get() - buffer[0] = 1 - buffer[len(buffer)-1] = 1 - sbp.put(buffer) - } - }) -} - -// Before: 104853 11119 ns/op 65598 B/op 1 allocs/op -// Array ptr: 2742781 438.3 ns/op 2 B/op 0 allocs/op -// Slices: 2560960 463.8 ns/op 2 B/op 0 allocs/op -// Slice pointer: 2683528 460.8 ns/op 2 B/op 0 allocs/op - -func Test_sizedBufferPool(t *testing.T) { - t.Parallel() - - const preAlloc = 1 - const poolSize = 2 - const maxIndex = maxMessageSize - 1 - - pool := newSizedBufferPool(preAlloc, poolSize) - - first := pool.get() // pre-allocated one - first[maxIndex] = 1 - - second := pool.get() // new one - second[maxIndex] = 2 - - third := pool.get() // new one - third[maxIndex] = 3 - - fourth := pool.get() // new one - fourth[maxIndex] = 4 - - pool.put(fourth) - pool.put(third) - pool.put(second) // discarded - pool.put(first) // discarded - - b := pool.get() // fourth - assert.Equal(t, byte(4), b[maxIndex]) - - b = pool.get() // third - assert.Equal(t, byte(3), b[maxIndex]) -} - -func Test_sizedBufferPool_race(t *testing.T) { - t.Parallel() - - const preAlloc = 1 - const poolSize = 2 - - pool := newSizedBufferPool(preAlloc, poolSize) - - const parallelism = 4 - - readyWait := new(sync.WaitGroup) - readyWait.Add(parallelism) - - doneWait := new(sync.WaitGroup) - doneWait.Add(parallelism) - - // run for 50ms - ctxTimerStarted := make(chan struct{}) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - go func() { - const timeout = 50 * time.Millisecond - readyWait.Wait() - ctx, cancel = context.WithTimeout(ctx, timeout) - close(ctxTimerStarted) - }() - defer cancel() - - for i := 0; i < parallelism; i++ { - go func() { - defer doneWait.Done() - readyWait.Done() - readyWait.Wait() - <-ctxTimerStarted - - for ctx.Err() != nil { - // test relies on the -race detector - // to detect concurrent writes to the buffer. - b := pool.get() - b[0] = 1 - pool.put(b) - } - }() - } - - doneWait.Wait() -} diff --git a/dot/network/service.go b/dot/network/service.go index 363d8ba6ef..cc78982699 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -107,7 +107,7 @@ type Service struct { host *host mdns *mdns gossip *gossip - bufPool *sizedBufferPool + bufPool *sync.Pool streamManager *streamManager notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info @@ -181,16 +181,12 @@ func NewService(cfg *Config) (*Service, error) { return nil, err } - // pre-allocate pool of buffers used to read from streams. - // initially allocate as many buffers as likely necessary which is the number of inbound streams we will have, - // which should equal the average number of peers times the number of notifications protocols, which is currently 3. - preAllocateInPool := cfg.MinPeers * 3 - poolSize := cfg.MaxPeers * 3 - if cfg.noPreAllocate { // testing - preAllocateInPool = 0 - poolSize = cfg.MinPeers * 3 + bufPool := &sync.Pool{ + New: func() interface{} { + b := make([]byte, maxMessageSize) + return &b + }, } - bufPool := newSizedBufferPool(preAllocateInPool, poolSize) network := &Service{ ctx: ctx, From fbd13d27f68b70ad660a6370535ef9ab74d3d296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20Junior?= Date: Fri, 4 Feb 2022 12:43:38 -0400 Subject: [PATCH 2/2] fix(lib/trie): remove map deletion at `loadProof` (#2259) * fix: remove map deletion to every proof hash * chore: use `node` as variable name --- lib/trie/database.go | 1 - lib/trie/proof_test.go | 82 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/lib/trie/database.go b/lib/trie/database.go index 87a8425f71..312da1ca3d 100644 --- a/lib/trie/database.go +++ b/lib/trie/database.go @@ -137,7 +137,6 @@ func (t *Trie) loadProof(proofHashToNode map[string]Node, n Node) { if !ok { continue } - delete(proofHashToNode, proofHash) branch.Children[i] = node t.loadProof(proofHashToNode, node) diff --git a/lib/trie/proof_test.go b/lib/trie/proof_test.go index 7c190d1c3c..7ab5c60361 100644 --- a/lib/trie/proof_test.go +++ b/lib/trie/proof_test.go @@ -78,11 +78,10 @@ func testGenerateProof(t *testing.T, entries []Pair, keys [][]byte) ([]byte, [][ value := trie.Get(key) require.NotNil(t, value) - itemFromDB := Pair{ + items[idx] = Pair{ Key: key, Value: value, } - items[idx] = itemFromDB } return root, proof, items @@ -95,23 +94,23 @@ func TestVerifyProof_ShouldReturnTrue(t *testing.T) { {Key: []byte("alpha"), Value: make([]byte, 32)}, {Key: []byte("bravo"), Value: []byte("bravo")}, {Key: []byte("do"), Value: []byte("verb")}, - {Key: []byte("dog"), Value: []byte("puppy")}, - {Key: []byte("doge"), Value: make([]byte, 32)}, + {Key: []byte("dogea"), Value: []byte("puppy")}, + {Key: []byte("dogeb"), Value: []byte("puppy")}, {Key: []byte("horse"), Value: []byte("stallion")}, {Key: []byte("house"), Value: []byte("building")}, } keys := [][]byte{ []byte("do"), - []byte("dog"), - []byte("doge"), + []byte("dogea"), + []byte("dogeb"), } - root, proof, pl := testGenerateProof(t, entries, keys) + root, proof, pairs := testGenerateProof(t, entries, keys) + v, err := VerifyProof(proof, root, pairs) - v, err := VerifyProof(proof, root, pl) - require.True(t, v) require.NoError(t, err) + require.True(t, v) } func TestVerifyProof_ShouldReturnDuplicateKeysError(t *testing.T) { @@ -158,3 +157,68 @@ func TestVerifyProof_ShouldReturnTrueWithouCompareValues(t *testing.T) { require.True(t, v) require.NoError(t, err) } + +func TestBranchNodes_SameHash_DiferentPaths_GenerateAndVerifyProof(t *testing.T) { + value := []byte("somevalue") + entries := []Pair{ + {Key: []byte("d"), Value: value}, + {Key: []byte("b"), Value: value}, + {Key: []byte("dxyz"), Value: value}, + {Key: []byte("bxyz"), Value: value}, + {Key: []byte("dxyzi"), Value: value}, + {Key: []byte("bxyzi"), Value: value}, + } + + keys := [][]byte{ + []byte("d"), + []byte("b"), + []byte("dxyz"), + []byte("bxyz"), + []byte("dxyzi"), + []byte("bxyzi"), + } + + root, proof, pairs := testGenerateProof(t, entries, keys) + + ok, err := VerifyProof(proof, root, pairs) + require.NoError(t, err) + require.True(t, ok) +} + +func TestLeafNodes_SameHash_DifferentPaths_GenerateAndVerifyProof(t *testing.T) { + tmp := t.TempDir() + + memdb, err := chaindb.NewBadgerDB(&chaindb.Config{ + InMemory: true, + DataDir: tmp, + }) + require.NoError(t, err) + + var ( + value = []byte("somevalue") + key1 = []byte("worlda") + key2 = []byte("worldb") + ) + + tt := NewEmptyTrie() + tt.Put(key1, value) + tt.Put(key2, value) + + err = tt.Store(memdb) + require.NoError(t, err) + + hash, err := tt.Hash() + require.NoError(t, err) + + proof, err := GenerateProof(hash.ToBytes(), [][]byte{key1, key2}, memdb) + require.NoError(t, err) + + pairs := []Pair{ + {Key: key1, Value: value}, + {Key: key2, Value: value}, + } + + ok, err := VerifyProof(proof, hash.ToBytes(), pairs) + require.NoError(t, err) + require.True(t, ok) +}