From 499a7f2334166919addd7ce7b00d59c92dfba6d4 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Thu, 17 Aug 2023 16:36:44 -0400 Subject: [PATCH 01/22] add sampling interface --- network/p2p/client.go | 16 ++- network/p2p/node_sampler.go | 92 ++++++++++++++++++ network/p2p/node_sampler_test.go | 162 +++++++++++++++++++++++++++++++ network/p2p/router.go | 26 +---- network/p2p/router_test.go | 89 ++--------------- 5 files changed, 274 insertions(+), 111 deletions(-) create mode 100644 network/p2p/node_sampler.go create mode 100644 network/p2p/node_sampler_test.go diff --git a/network/p2p/client.go b/network/p2p/client.go index 6aab8d815ae..1d1f95d5db5 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -41,6 +41,7 @@ type Client struct { handlerPrefix []byte router *Router sender common.AppSender + nodeSampler NodeSampler } // AppRequestAny issues an AppRequest to an arbitrary node decided by Client. @@ -51,15 +52,20 @@ func (c *Client) AppRequestAny( appRequestBytes []byte, onResponse AppResponseCallback, ) error { - c.router.lock.RLock() - peers := c.router.peers.Sample(1) - c.router.lock.RUnlock() + sampled, err := c.nodeSampler.Sample(1) + if err != nil { + return fmt.Errorf("failed to sample peers: %w", err) + } - if len(peers) != 1 { + if len(sampled) != 1 { return ErrNoPeers } - nodeIDs := set.Of(peers[0]) + nodeIDs := set.NewSet[ids.NodeID](len(sampled)) + for _, s := range sampled { + nodeIDs.Add(s) + } + return c.AppRequest(ctx, nodeIDs, appRequestBytes, onResponse) } diff --git a/network/p2p/node_sampler.go b/network/p2p/node_sampler.go new file mode 100644 index 00000000000..9f488608626 --- /dev/null +++ b/network/p2p/node_sampler.go @@ -0,0 +1,92 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "fmt" + "sync" + + "golang.org/x/exp/maps" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/validators" + safemath "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/sampler" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" +) + +var ( + _ NodeSampler = (*Validators)(nil) + + _ validators.Connector = (*Peers)(nil) + _ NodeSampler = (*Peers)(nil) +) + +type NodeSampler interface { + // Sample returns at most [n] nodes. This may return fewer nodes if fewer + // than [n] are available. + Sample(n int) ([]ids.NodeID, error) +} + +type Validators struct { + SubnetID ids.ID + Validators validators.State +} + +func (v Validators) Sample(n int) ([]ids.NodeID, error) { + currentHeight, err := v.Validators.GetCurrentHeight(context.TODO()) + if err != nil { + return nil, fmt.Errorf("failed to get current height: %w", err) + } + + validatorSet, err := v.Validators.GetValidatorSet(context.TODO(), currentHeight, v.SubnetID) + if err != nil { + return nil, fmt.Errorf("failed to get current validator set: %w", err) + } + + validatorIDs := maps.Keys(validatorSet) + + s := sampler.NewUniform() + s.Initialize(uint64(len(validatorIDs))) + + indices, _ := s.Sample(safemath.Min(len(validatorIDs), n)) + sampled := make([]ids.NodeID, 0, len(indices)) + for _, i := range indices { + sampled = append(sampled, validatorIDs[i]) + } + + return sampled, nil +} + +type Peers struct { + lock sync.RWMutex + peers set.SampleableSet[ids.NodeID] +} + +func (p *Peers) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.peers.Add(nodeID) + + return nil +} + +func (p *Peers) Disconnected(_ context.Context, nodeID ids.NodeID) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.peers.Remove(nodeID) + + return nil +} + +func (p *Peers) Sample(n int) ([]ids.NodeID, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.peers.Sample(n), nil +} diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go new file mode 100644 index 00000000000..09005e433b8 --- /dev/null +++ b/network/p2p/node_sampler_test.go @@ -0,0 +1,162 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/validators" + safemath "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/set" +) + +func TestPeersSample(t *testing.T) { + nodeID1 := ids.GenerateTestNodeID() + nodeID2 := ids.GenerateTestNodeID() + nodeID3 := ids.GenerateTestNodeID() + + tests := []struct { + name string + connected set.Set[ids.NodeID] + disconnected set.Set[ids.NodeID] + n int + }{ + { + name: "no peers", + n: 1, + }, + { + name: "one peer connected", + connected: set.Of[ids.NodeID](nodeID1), + n: 1, + }, + { + name: "multiple peers connected", + connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + n: 1, + }, + { + name: "peer connects and disconnects - 1", + connected: set.Of[ids.NodeID](nodeID1), + disconnected: set.Of[ids.NodeID](nodeID1), + n: 1, + }, + { + name: "peer connects and disconnects - 2", + connected: set.Of[ids.NodeID](nodeID1, nodeID2), + disconnected: set.Of[ids.NodeID](nodeID2), + n: 1, + }, + { + name: "peer connects and disconnects - 2", + connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + disconnected: set.Of[ids.NodeID](nodeID1, nodeID2), + n: 1, + }, + { + name: "less than n peers", + connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + n: 4, + }, + { + name: "n peers", + connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + n: 3, + }, + { + name: "more than n peers", + connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + n: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + sampler := Peers{} + + for connected := range tt.connected { + require.NoError(sampler.Connected(context.Background(), connected, nil)) + } + + for disconnected := range tt.disconnected { + require.NoError(sampler.Disconnected(context.Background(), disconnected)) + } + + sampleable := set.Set[ids.NodeID]{} + sampleable.Union(tt.connected) + sampleable.Difference(tt.disconnected) + + sampled, err := sampler.Sample(tt.n) + require.NoError(err) + require.Len(sampled, safemath.Min(tt.n, len(sampleable))) + require.Subset(sampleable, sampled) + }) + } +} + +func TestValidatorsSample(t *testing.T) { + tests := []struct { + name string + validators []ids.NodeID + n int + }{ + { + name: "less than n validators", + validators: []ids.NodeID{ids.GenerateTestNodeID()}, + n: 2, + }, + { + name: "equal to n validators", + validators: []ids.NodeID{ + ids.GenerateTestNodeID(), + ids.GenerateTestNodeID(), + }, + n: 2, + }, + { + name: "greater than n validators", + validators: []ids.NodeID{ + ids.GenerateTestNodeID(), + ids.GenerateTestNodeID(), + ids.GenerateTestNodeID(), + }, + n: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + subnetID := ids.GenerateTestID() + + height := uint64(1234) + mockValidators := validators.NewMockState(ctrl) + mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(height, nil) + + validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) + for _, validator := range tt.validators { + validatorSet[validator] = nil + + } + mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil) + + v := Validators{ + SubnetID: subnetID, + Validators: mockValidators, + } + + sampled, err := v.Sample(tt.n) + require.NoError(err) + require.Len(sampled, safemath.Min(tt.n, len(tt.validators))) + }) + } +} diff --git a/network/p2p/router.go b/network/p2p/router.go index a685abc7e50..4fca4bf0518 100644 --- a/network/p2p/router.go +++ b/network/p2p/router.go @@ -16,18 +16,14 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/snow/engine/common" - "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/set" - "github.com/ava-labs/avalanchego/version" ) var ( ErrExistingAppProtocol = errors.New("existing app protocol") ErrUnrequestedResponse = errors.New("unrequested response") - _ common.AppHandler = (*Router)(nil) - _ validators.Connector = (*Router)(nil) + _ common.AppHandler = (*Router)(nil) ) // Router routes incoming application messages to the corresponding registered @@ -42,7 +38,6 @@ type Router struct { pendingAppRequests map[uint32]AppResponseCallback pendingCrossChainAppRequests map[uint32]CrossChainAppResponseCallback requestID uint32 - peers set.SampleableSet[ids.NodeID] } // NewRouter returns a new instance of Router @@ -56,26 +51,10 @@ func NewRouter(log logging.Logger, sender common.AppSender) *Router { } } -func (r *Router) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error { - r.lock.Lock() - defer r.lock.Unlock() - - r.peers.Add(nodeID) - return nil -} - -func (r *Router) Disconnected(_ context.Context, nodeID ids.NodeID) error { - r.lock.Lock() - defer r.lock.Unlock() - - r.peers.Remove(nodeID) - return nil -} - // RegisterAppProtocol reserves an identifier for an application protocol and // returns a Client that can be used to send messages for the corresponding // protocol. -func (r *Router) RegisterAppProtocol(handlerID uint64, handler Handler) (*Client, error) { +func (r *Router) RegisterAppProtocol(handlerID uint64, handler Handler, nodeSampler NodeSampler) (*Client, error) { r.lock.Lock() defer r.lock.Unlock() @@ -94,6 +73,7 @@ func (r *Router) RegisterAppProtocol(handlerID uint64, handler Handler) (*Client handlerPrefix: binary.AppendUvarint(nil, handlerID), sender: r.sender, router: r, + nodeSampler: nodeSampler, }, nil } diff --git a/network/p2p/router_test.go b/network/p2p/router_test.go index d9590428dcd..3953538b76e 100644 --- a/network/p2p/router_test.go +++ b/network/p2p/router_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" "github.com/ava-labs/avalanchego/ids" @@ -192,9 +191,9 @@ func TestAppRequestResponse(t *testing.T) { sender := common.NewMockSender(ctrl) handler := mocks.NewMockHandler(ctrl) router := NewRouter(logging.NoLog{}, sender) - require.NoError(router.Connected(context.Background(), nodeID, nil)) - - client, err := router.RegisterAppProtocol(handlerID, handler) + sampler := &Peers{} + require.NoError(sampler.Connected(context.Background(), nodeID, nil)) + client, err := router.RegisterAppProtocol(handlerID, handler, sampler) require.NoError(err) wg := &sync.WaitGroup{} @@ -330,7 +329,9 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) { }).AnyTimes() sender.EXPECT().SendAppResponse(gomock.Any(), gomock.Any(), gomock.Any(), response) - client, err := router.RegisterAppProtocol(0x1, handler) + sampler := &Peers{} + require.NoError(sampler.Connected(context.Background(), nodeID, nil)) + client, err := router.RegisterAppProtocol(0x1, handler, sampler) require.NoError(err) require.NoError(client.AppRequest(context.Background(), set.Of(nodeID), []byte{}, nil)) @@ -345,81 +346,3 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) { timeout.Done() } - -func TestRouterConnected(t *testing.T) { - tests := []struct { - name string - connect []ids.NodeID - disconnect []ids.NodeID - }{ - { - name: "empty", - }, - { - name: "connect and disconnect", - connect: []ids.NodeID{ - {0x0}, - }, - disconnect: []ids.NodeID{ - {0x0}, - }, - }, - { - name: "two nodes connect", - connect: []ids.NodeID{ - {0x0, 0x1}, - }, - }, - { - name: "two nodes connect, last one disconnects", - connect: []ids.NodeID{ - {0x0, 0x1}, - }, - disconnect: []ids.NodeID{ - {0x1}, - }, - }, - { - name: "two nodes connect, first one disconnects", - connect: []ids.NodeID{ - {0x0, 0x1}, - }, - disconnect: []ids.NodeID{ - {0x0}, - }, - }, - { - name: "two nodes connect and disconnect", - connect: []ids.NodeID{ - {0x0, 0x1}, - }, - disconnect: []ids.NodeID{ - {0x0, 0x1}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - router := NewRouter(logging.NoLog{}, nil) - - expected := set.Set[ids.NodeID]{} - - for _, connect := range tt.connect { - expected.Add(connect) - require.NoError(router.Connected(context.Background(), connect, nil)) - } - - for _, disconnect := range tt.disconnect { - expected.Remove(disconnect) - require.NoError(router.Disconnected(context.Background(), disconnect)) - } - - require.Len(expected, router.peers.Len()) - for _, peer := range router.peers.List() { - require.Contains(expected, peer) - } - }) - } -} From fb4573f69a1b32cbdf2c31fc9cb0d2f6a0928c01 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 18 Aug 2023 16:54:23 -0400 Subject: [PATCH 02/22] cache validator sets --- network/p2p/node_sampler.go | 41 +++++++++++++++++---- network/p2p/node_sampler_test.go | 61 ++++++++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 9 deletions(-) diff --git a/network/p2p/node_sampler.go b/network/p2p/node_sampler.go index 9f488608626..0519f607f22 100644 --- a/network/p2p/node_sampler.go +++ b/network/p2p/node_sampler.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "sync" + "time" "golang.org/x/exp/maps" @@ -18,6 +19,10 @@ import ( "github.com/ava-labs/avalanchego/version" ) +const ( + maxValidatorSetStaleness = time.Minute +) + var ( _ NodeSampler = (*Validators)(nil) @@ -31,23 +36,47 @@ type NodeSampler interface { Sample(n int) ([]ids.NodeID, error) } +func NewValidators(subnetID ids.ID, validators validators.State) *Validators { + return &Validators{ + subnetID: subnetID, + validators: validators, + maxValidatorSetStaleness: maxValidatorSetStaleness, + } +} + type Validators struct { - SubnetID ids.ID - Validators validators.State + subnetID ids.ID + validators validators.State + + recentValidatorIDs []ids.NodeID + lastUpdated time.Time + maxValidatorSetStaleness time.Duration } -func (v Validators) Sample(n int) ([]ids.NodeID, error) { - currentHeight, err := v.Validators.GetCurrentHeight(context.TODO()) +func (v Validators) getRecentValidatorIDs() ([]ids.NodeID, error) { + if time.Since(v.lastUpdated) < v.maxValidatorSetStaleness { + return v.recentValidatorIDs, nil + } + + height, err := v.validators.GetCurrentHeight(context.TODO()) if err != nil { return nil, fmt.Errorf("failed to get current height: %w", err) } - validatorSet, err := v.Validators.GetValidatorSet(context.TODO(), currentHeight, v.SubnetID) + validatorSet, err := v.validators.GetValidatorSet(context.TODO(), height, v.subnetID) if err != nil { return nil, fmt.Errorf("failed to get current validator set: %w", err) } - validatorIDs := maps.Keys(validatorSet) + v.recentValidatorIDs = maps.Keys(validatorSet) + return v.recentValidatorIDs, nil +} + +func (v Validators) Sample(n int) ([]ids.NodeID, error) { + validatorIDs, err := v.getRecentValidatorIDs() + if err != nil { + return nil, err + } s := sampler.NewUniform() s.Initialize(uint64(len(validatorIDs))) diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go index 09005e433b8..039601d637e 100644 --- a/network/p2p/node_sampler_test.go +++ b/network/p2p/node_sampler_test.go @@ -6,6 +6,7 @@ package p2p import ( "context" "testing" + "time" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -145,13 +146,12 @@ func TestValidatorsSample(t *testing.T) { validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) for _, validator := range tt.validators { validatorSet[validator] = nil - } mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil) v := Validators{ - SubnetID: subnetID, - Validators: mockValidators, + subnetID: subnetID, + validators: mockValidators, } sampled, err := v.Sample(tt.n) @@ -160,3 +160,58 @@ func TestValidatorsSample(t *testing.T) { }) } } + +// invariant: we should only call GetValidatorSet when it passes a max staleness +// threshold +func TestValidatorsSampleCaching(t *testing.T) { + tests := []struct { + name string + validators []ids.NodeID + maxStaleness time.Duration + elapsed time.Duration + expectedCalls int + }{ + { + name: "within max threshold", + validators: []ids.NodeID{ids.GenerateTestNodeID()}, + maxStaleness: time.Hour, + elapsed: time.Second, + }, + { + name: "beyond max threshold", + validators: []ids.NodeID{ids.GenerateTestNodeID()}, + maxStaleness: time.Hour, + elapsed: time.Hour + 1, + expectedCalls: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + subnetID := ids.GenerateTestID() + + height := uint64(1234) + mockValidators := validators.NewMockState(ctrl) + mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(height, nil).Times(tt.expectedCalls) + + validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) + for _, validator := range tt.validators { + validatorSet[validator] = nil + } + mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil).Times(tt.expectedCalls) + + v := Validators{ + subnetID: subnetID, + validators: mockValidators, + maxValidatorSetStaleness: tt.maxStaleness, + } + v.lastUpdated = time.Now().Add(-tt.elapsed) + + _, err := v.Sample(1) + require.NoError(err) + }) + } +} From 25d8e69c3e70b1f805c1e371c29109310dc5f272 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 18 Aug 2023 16:59:47 -0400 Subject: [PATCH 03/22] use of --- network/p2p/client.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/network/p2p/client.go b/network/p2p/client.go index 1d1f95d5db5..0d6f2e564f6 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -61,11 +61,7 @@ func (c *Client) AppRequestAny( return ErrNoPeers } - nodeIDs := set.NewSet[ids.NodeID](len(sampled)) - for _, s := range sampled { - nodeIDs.Add(s) - } - + nodeIDs := set.Of(sampled...) return c.AppRequest(ctx, nodeIDs, appRequestBytes, onResponse) } From 203dbe6336e89262cf6df8634fe3252e0b31d22d Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 18 Aug 2023 17:01:02 -0400 Subject: [PATCH 04/22] nit --- network/p2p/node_sampler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/p2p/node_sampler.go b/network/p2p/node_sampler.go index 0519f607f22..adc2953384a 100644 --- a/network/p2p/node_sampler.go +++ b/network/p2p/node_sampler.go @@ -53,7 +53,7 @@ type Validators struct { maxValidatorSetStaleness time.Duration } -func (v Validators) getRecentValidatorIDs() ([]ids.NodeID, error) { +func (v *Validators) getRecentValidatorIDs() ([]ids.NodeID, error) { if time.Since(v.lastUpdated) < v.maxValidatorSetStaleness { return v.recentValidatorIDs, nil } @@ -72,7 +72,7 @@ func (v Validators) getRecentValidatorIDs() ([]ids.NodeID, error) { return v.recentValidatorIDs, nil } -func (v Validators) Sample(n int) ([]ids.NodeID, error) { +func (v *Validators) Sample(n int) ([]ids.NodeID, error) { validatorIDs, err := v.getRecentValidatorIDs() if err != nil { return nil, err From fc6b4fbf2e974a39052532fe7bc669a56499cd45 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 18 Aug 2023 17:05:27 -0400 Subject: [PATCH 05/22] nit --- network/p2p/node_sampler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go index 039601d637e..71c73808d7f 100644 --- a/network/p2p/node_sampler_test.go +++ b/network/p2p/node_sampler_test.go @@ -203,7 +203,7 @@ func TestValidatorsSampleCaching(t *testing.T) { } mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil).Times(tt.expectedCalls) - v := Validators{ + v := &Validators{ subnetID: subnetID, validators: mockValidators, maxValidatorSetStaleness: tt.maxStaleness, From 77849bb9d1f3539c1937c7ff57136fe2315e1bbf Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 18 Aug 2023 17:06:08 -0400 Subject: [PATCH 06/22] nit --- network/p2p/node_sampler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go index 71c73808d7f..6817736e9b0 100644 --- a/network/p2p/node_sampler_test.go +++ b/network/p2p/node_sampler_test.go @@ -80,7 +80,7 @@ func TestPeersSample(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { require := require.New(t) - sampler := Peers{} + sampler := &Peers{} for connected := range tt.connected { require.NoError(sampler.Connected(context.Background(), connected, nil)) @@ -149,7 +149,7 @@ func TestValidatorsSample(t *testing.T) { } mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil) - v := Validators{ + v := &Validators{ subnetID: subnetID, validators: mockValidators, } From cb1b78f917bc15236959bc6d6904d17e21f4ea97 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 18 Aug 2023 21:01:37 -0400 Subject: [PATCH 07/22] cleanup --- network/p2p/node_sampler_test.go | 34 +++++++++++++------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go index 6817736e9b0..da5bd64bbb8 100644 --- a/network/p2p/node_sampler_test.go +++ b/network/p2p/node_sampler_test.go @@ -34,45 +34,45 @@ func TestPeersSample(t *testing.T) { }, { name: "one peer connected", - connected: set.Of[ids.NodeID](nodeID1), + connected: set.Of(nodeID1), n: 1, }, { name: "multiple peers connected", - connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + connected: set.Of(nodeID1, nodeID2, nodeID3), n: 1, }, { name: "peer connects and disconnects - 1", - connected: set.Of[ids.NodeID](nodeID1), - disconnected: set.Of[ids.NodeID](nodeID1), + connected: set.Of(nodeID1), + disconnected: set.Of(nodeID1), n: 1, }, { name: "peer connects and disconnects - 2", - connected: set.Of[ids.NodeID](nodeID1, nodeID2), - disconnected: set.Of[ids.NodeID](nodeID2), + connected: set.Of(nodeID1, nodeID2), + disconnected: set.Of(nodeID2), n: 1, }, { name: "peer connects and disconnects - 2", - connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), - disconnected: set.Of[ids.NodeID](nodeID1, nodeID2), + connected: set.Of(nodeID1, nodeID2, nodeID3), + disconnected: set.Of(nodeID1, nodeID2), n: 1, }, { name: "less than n peers", - connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + connected: set.Of(nodeID1, nodeID2, nodeID3), n: 4, }, { name: "n peers", - connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + connected: set.Of(nodeID1, nodeID2, nodeID3), n: 3, }, { name: "more than n peers", - connected: set.Of[ids.NodeID](nodeID1, nodeID2, nodeID3), + connected: set.Of(nodeID1, nodeID2, nodeID3), n: 2, }, } @@ -149,10 +149,7 @@ func TestValidatorsSample(t *testing.T) { } mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil) - v := &Validators{ - subnetID: subnetID, - validators: mockValidators, - } + v := NewValidators(subnetID, mockValidators) sampled, err := v.Sample(tt.n) require.NoError(err) @@ -203,11 +200,8 @@ func TestValidatorsSampleCaching(t *testing.T) { } mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil).Times(tt.expectedCalls) - v := &Validators{ - subnetID: subnetID, - validators: mockValidators, - maxValidatorSetStaleness: tt.maxStaleness, - } + v := NewValidators(subnetID, mockValidators) + v.maxValidatorSetStaleness = tt.maxStaleness v.lastUpdated = time.Now().Add(-tt.elapsed) _, err := v.Sample(1) From 0af14730880f304437995f308216f60bd06dc590 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 17:20:21 -0400 Subject: [PATCH 08/22] Update network/p2p/node_sampler.go Co-authored-by: Stephen Buttolph Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/node_sampler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/node_sampler.go b/network/p2p/node_sampler.go index adc2953384a..7f7f1c2958e 100644 --- a/network/p2p/node_sampler.go +++ b/network/p2p/node_sampler.go @@ -13,7 +13,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" - safemath "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/avalanchego/utils/sampler" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" From d140e94d3119d162e983d067be77084b4116bd63 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 17:35:41 -0400 Subject: [PATCH 09/22] Update network/p2p/node_sampler_test.go Co-authored-by: Stephen Buttolph Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/node_sampler_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go index da5bd64bbb8..a819874d738 100644 --- a/network/p2p/node_sampler_test.go +++ b/network/p2p/node_sampler_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "github.com/ava-labs/avalanchego/ids" From d4379983d816f1a770674bd30352735edac45f35 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 17:35:53 -0400 Subject: [PATCH 10/22] Update network/p2p/node_sampler_test.go Co-authored-by: Stephen Buttolph Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/node_sampler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go index a819874d738..3bd8b42607c 100644 --- a/network/p2p/node_sampler_test.go +++ b/network/p2p/node_sampler_test.go @@ -14,7 +14,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" - safemath "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/avalanchego/utils/set" ) From 167ba94db31a158971cfcdf3966adeca69367e1c Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 22:55:25 -0400 Subject: [PATCH 11/22] nits --- network/p2p/client.go | 6 +- network/p2p/node_sampler.go | 112 +--------------- network/p2p/node_sampler_test.go | 212 ------------------------------- network/p2p/peers.go | 50 ++++++++ network/p2p/peers_test.go | 146 +++++++++++++++++++++ network/p2p/router_test.go | 12 +- network/p2p/validators.go | 75 +++++++++++ network/p2p/validators_test.go | 144 +++++++++++++++++++++ 8 files changed, 426 insertions(+), 331 deletions(-) delete mode 100644 network/p2p/node_sampler_test.go create mode 100644 network/p2p/peers.go create mode 100644 network/p2p/peers_test.go create mode 100644 network/p2p/validators.go create mode 100644 network/p2p/validators_test.go diff --git a/network/p2p/client.go b/network/p2p/client.go index 0d6f2e564f6..4fae1cbe53f 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -52,11 +52,7 @@ func (c *Client) AppRequestAny( appRequestBytes []byte, onResponse AppResponseCallback, ) error { - sampled, err := c.nodeSampler.Sample(1) - if err != nil { - return fmt.Errorf("failed to sample peers: %w", err) - } - + sampled := c.nodeSampler.Sample(ctx, 1) if len(sampled) != 1 { return ErrNoPeers } diff --git a/network/p2p/node_sampler.go b/network/p2p/node_sampler.go index 7f7f1c2958e..cab7a4d24fe 100644 --- a/network/p2p/node_sampler.go +++ b/network/p2p/node_sampler.go @@ -5,117 +5,13 @@ package p2p import ( "context" - "fmt" - "sync" - "time" - - "golang.org/x/exp/maps" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/validators" - "github.com/ava-labs/avalanchego/utils/math" - "github.com/ava-labs/avalanchego/utils/sampler" - "github.com/ava-labs/avalanchego/utils/set" - "github.com/ava-labs/avalanchego/version" -) - -const ( - maxValidatorSetStaleness = time.Minute -) - -var ( - _ NodeSampler = (*Validators)(nil) - - _ validators.Connector = (*Peers)(nil) - _ NodeSampler = (*Peers)(nil) ) +// NodeSampler samples members of the p2p network. type NodeSampler interface { - // Sample returns at most [n] nodes. This may return fewer nodes if fewer - // than [n] are available. - Sample(n int) ([]ids.NodeID, error) -} - -func NewValidators(subnetID ids.ID, validators validators.State) *Validators { - return &Validators{ - subnetID: subnetID, - validators: validators, - maxValidatorSetStaleness: maxValidatorSetStaleness, - } -} - -type Validators struct { - subnetID ids.ID - validators validators.State - - recentValidatorIDs []ids.NodeID - lastUpdated time.Time - maxValidatorSetStaleness time.Duration -} - -func (v *Validators) getRecentValidatorIDs() ([]ids.NodeID, error) { - if time.Since(v.lastUpdated) < v.maxValidatorSetStaleness { - return v.recentValidatorIDs, nil - } - - height, err := v.validators.GetCurrentHeight(context.TODO()) - if err != nil { - return nil, fmt.Errorf("failed to get current height: %w", err) - } - - validatorSet, err := v.validators.GetValidatorSet(context.TODO(), height, v.subnetID) - if err != nil { - return nil, fmt.Errorf("failed to get current validator set: %w", err) - } - - v.recentValidatorIDs = maps.Keys(validatorSet) - return v.recentValidatorIDs, nil -} - -func (v *Validators) Sample(n int) ([]ids.NodeID, error) { - validatorIDs, err := v.getRecentValidatorIDs() - if err != nil { - return nil, err - } - - s := sampler.NewUniform() - s.Initialize(uint64(len(validatorIDs))) - - indices, _ := s.Sample(safemath.Min(len(validatorIDs), n)) - sampled := make([]ids.NodeID, 0, len(indices)) - for _, i := range indices { - sampled = append(sampled, validatorIDs[i]) - } - - return sampled, nil -} - -type Peers struct { - lock sync.RWMutex - peers set.SampleableSet[ids.NodeID] -} - -func (p *Peers) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error { - p.lock.Lock() - defer p.lock.Unlock() - - p.peers.Add(nodeID) - - return nil -} - -func (p *Peers) Disconnected(_ context.Context, nodeID ids.NodeID) error { - p.lock.Lock() - defer p.lock.Unlock() - - p.peers.Remove(nodeID) - - return nil -} - -func (p *Peers) Sample(n int) ([]ids.NodeID, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - return p.peers.Sample(n), nil + // Sample returns at most [limit] nodes. This may return fewer nodes if + // fewer than [limit] are available. + Sample(ctx context.Context, limit int) []ids.NodeID } diff --git a/network/p2p/node_sampler_test.go b/network/p2p/node_sampler_test.go deleted file mode 100644 index 3bd8b42607c..00000000000 --- a/network/p2p/node_sampler_test.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package p2p - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "go.uber.org/mock/gomock" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/validators" - "github.com/ava-labs/avalanchego/utils/math" - "github.com/ava-labs/avalanchego/utils/set" -) - -func TestPeersSample(t *testing.T) { - nodeID1 := ids.GenerateTestNodeID() - nodeID2 := ids.GenerateTestNodeID() - nodeID3 := ids.GenerateTestNodeID() - - tests := []struct { - name string - connected set.Set[ids.NodeID] - disconnected set.Set[ids.NodeID] - n int - }{ - { - name: "no peers", - n: 1, - }, - { - name: "one peer connected", - connected: set.Of(nodeID1), - n: 1, - }, - { - name: "multiple peers connected", - connected: set.Of(nodeID1, nodeID2, nodeID3), - n: 1, - }, - { - name: "peer connects and disconnects - 1", - connected: set.Of(nodeID1), - disconnected: set.Of(nodeID1), - n: 1, - }, - { - name: "peer connects and disconnects - 2", - connected: set.Of(nodeID1, nodeID2), - disconnected: set.Of(nodeID2), - n: 1, - }, - { - name: "peer connects and disconnects - 2", - connected: set.Of(nodeID1, nodeID2, nodeID3), - disconnected: set.Of(nodeID1, nodeID2), - n: 1, - }, - { - name: "less than n peers", - connected: set.Of(nodeID1, nodeID2, nodeID3), - n: 4, - }, - { - name: "n peers", - connected: set.Of(nodeID1, nodeID2, nodeID3), - n: 3, - }, - { - name: "more than n peers", - connected: set.Of(nodeID1, nodeID2, nodeID3), - n: 2, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - sampler := &Peers{} - - for connected := range tt.connected { - require.NoError(sampler.Connected(context.Background(), connected, nil)) - } - - for disconnected := range tt.disconnected { - require.NoError(sampler.Disconnected(context.Background(), disconnected)) - } - - sampleable := set.Set[ids.NodeID]{} - sampleable.Union(tt.connected) - sampleable.Difference(tt.disconnected) - - sampled, err := sampler.Sample(tt.n) - require.NoError(err) - require.Len(sampled, safemath.Min(tt.n, len(sampleable))) - require.Subset(sampleable, sampled) - }) - } -} - -func TestValidatorsSample(t *testing.T) { - tests := []struct { - name string - validators []ids.NodeID - n int - }{ - { - name: "less than n validators", - validators: []ids.NodeID{ids.GenerateTestNodeID()}, - n: 2, - }, - { - name: "equal to n validators", - validators: []ids.NodeID{ - ids.GenerateTestNodeID(), - ids.GenerateTestNodeID(), - }, - n: 2, - }, - { - name: "greater than n validators", - validators: []ids.NodeID{ - ids.GenerateTestNodeID(), - ids.GenerateTestNodeID(), - ids.GenerateTestNodeID(), - }, - n: 2, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - ctrl := gomock.NewController(t) - - subnetID := ids.GenerateTestID() - - height := uint64(1234) - mockValidators := validators.NewMockState(ctrl) - mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(height, nil) - - validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) - for _, validator := range tt.validators { - validatorSet[validator] = nil - } - mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil) - - v := NewValidators(subnetID, mockValidators) - - sampled, err := v.Sample(tt.n) - require.NoError(err) - require.Len(sampled, safemath.Min(tt.n, len(tt.validators))) - }) - } -} - -// invariant: we should only call GetValidatorSet when it passes a max staleness -// threshold -func TestValidatorsSampleCaching(t *testing.T) { - tests := []struct { - name string - validators []ids.NodeID - maxStaleness time.Duration - elapsed time.Duration - expectedCalls int - }{ - { - name: "within max threshold", - validators: []ids.NodeID{ids.GenerateTestNodeID()}, - maxStaleness: time.Hour, - elapsed: time.Second, - }, - { - name: "beyond max threshold", - validators: []ids.NodeID{ids.GenerateTestNodeID()}, - maxStaleness: time.Hour, - elapsed: time.Hour + 1, - expectedCalls: 1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - ctrl := gomock.NewController(t) - - subnetID := ids.GenerateTestID() - - height := uint64(1234) - mockValidators := validators.NewMockState(ctrl) - mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(height, nil).Times(tt.expectedCalls) - - validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) - for _, validator := range tt.validators { - validatorSet[validator] = nil - } - mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil).Times(tt.expectedCalls) - - v := NewValidators(subnetID, mockValidators) - v.maxValidatorSetStaleness = tt.maxStaleness - v.lastUpdated = time.Now().Add(-tt.elapsed) - - _, err := v.Sample(1) - require.NoError(err) - }) - } -} diff --git a/network/p2p/peers.go b/network/p2p/peers.go new file mode 100644 index 00000000000..47982aeb2dc --- /dev/null +++ b/network/p2p/peers.go @@ -0,0 +1,50 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "sync" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" +) + +var ( + _ validators.Connector = (*Peers)(nil) + _ NodeSampler = (*Peers)(nil) +) + +// Peers contains a set of nodes that we are connected to. +type Peers struct { + lock sync.RWMutex + peers set.SampleableSet[ids.NodeID] +} + +func (p *Peers) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.peers.Add(nodeID) + + return nil +} + +func (p *Peers) Disconnected(_ context.Context, nodeID ids.NodeID) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.peers.Remove(nodeID) + + return nil +} + +func (p *Peers) Sample(_ context.Context, limit int) []ids.NodeID { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.peers.Sample(limit) +} diff --git a/network/p2p/peers_test.go b/network/p2p/peers_test.go new file mode 100644 index 00000000000..ba428971cb0 --- /dev/null +++ b/network/p2p/peers_test.go @@ -0,0 +1,146 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/set" +) + +// Sample should always return up to [limit] peers, and less if fewer than +// [limit] peers are available. +func TestPeersSample(t *testing.T) { + nodeID1 := ids.GenerateTestNodeID() + nodeID2 := ids.GenerateTestNodeID() + nodeID3 := ids.GenerateTestNodeID() + + tests := []struct { + name string + connected set.Set[ids.NodeID] + disconnected set.Set[ids.NodeID] + limit int + }{ + { + name: "no peers", + limit: 1, + }, + { + name: "one peer connected", + connected: set.Of(nodeID1), + limit: 1, + }, + { + name: "multiple peers connected", + connected: set.Of(nodeID1, nodeID2, nodeID3), + limit: 1, + }, + { + name: "peer connects and disconnects - 1", + connected: set.Of(nodeID1), + disconnected: set.Of(nodeID1), + limit: 1, + }, + { + name: "peer connects and disconnects - 2", + connected: set.Of(nodeID1, nodeID2), + disconnected: set.Of(nodeID2), + limit: 1, + }, + { + name: "peer connects and disconnects - 2", + connected: set.Of(nodeID1, nodeID2, nodeID3), + disconnected: set.Of(nodeID1, nodeID2), + limit: 1, + }, + { + name: "less than limit peers", + connected: set.Of(nodeID1, nodeID2, nodeID3), + limit: 4, + }, + { + name: "limit peers", + connected: set.Of(nodeID1, nodeID2, nodeID3), + limit: 3, + }, + { + name: "more than limit peers", + connected: set.Of(nodeID1, nodeID2, nodeID3), + limit: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + peers := &Peers{} + + for connected := range tt.connected { + require.NoError(peers.Connected(context.Background(), connected, nil)) + } + + for disconnected := range tt.disconnected { + require.NoError(peers.Disconnected(context.Background(), disconnected)) + } + + sampleable := set.Set[ids.NodeID]{} + sampleable.Union(tt.connected) + sampleable.Difference(tt.disconnected) + + sampled := peers.Sample(context.Background(), tt.limit) + require.Len(sampled, math.Min(tt.limit, len(sampleable))) + require.Subset(sampleable, sampled) + }) + } +} + +func TestAppRequestAnyNodeSelection(t *testing.T) { + tests := []struct { + name string + peers []ids.NodeID + expected error + }{ + { + name: "no peers", + expected: ErrNoPeers, + }, + { + name: "has peers", + peers: []ids.NodeID{ids.GenerateTestNodeID()}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + mockAppSender := common.NewMockSender(ctrl) + + expectedCalls := 0 + if tt.expected == nil { + expectedCalls = 1 + } + mockAppSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(expectedCalls) + + r := NewRouter(logging.NoLog{}, mockAppSender) + peers := &Peers{} + for _, peer := range tt.peers { + require.NoError(peers.Connected(context.Background(), peer, nil)) + } + + client, err := r.RegisterAppProtocol(1, nil, peers) + require.NoError(err) + + require.ErrorIs(client.AppRequestAny(context.Background(), []byte("foobar"), nil), tt.expected) + }) + } +} diff --git a/network/p2p/router_test.go b/network/p2p/router_test.go index 3953538b76e..94df6ba1d6e 100644 --- a/network/p2p/router_test.go +++ b/network/p2p/router_test.go @@ -191,9 +191,9 @@ func TestAppRequestResponse(t *testing.T) { sender := common.NewMockSender(ctrl) handler := mocks.NewMockHandler(ctrl) router := NewRouter(logging.NoLog{}, sender) - sampler := &Peers{} - require.NoError(sampler.Connected(context.Background(), nodeID, nil)) - client, err := router.RegisterAppProtocol(handlerID, handler, sampler) + peers := &Peers{} + require.NoError(peers.Connected(context.Background(), nodeID, nil)) + client, err := router.RegisterAppProtocol(handlerID, handler, peers) require.NoError(err) wg := &sync.WaitGroup{} @@ -329,9 +329,9 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) { }).AnyTimes() sender.EXPECT().SendAppResponse(gomock.Any(), gomock.Any(), gomock.Any(), response) - sampler := &Peers{} - require.NoError(sampler.Connected(context.Background(), nodeID, nil)) - client, err := router.RegisterAppProtocol(0x1, handler, sampler) + peers := &Peers{} + require.NoError(peers.Connected(context.Background(), nodeID, nil)) + client, err := router.RegisterAppProtocol(0x1, handler, peers) require.NoError(err) require.NoError(client.AppRequest(context.Background(), set.Of(nodeID), []byte{}, nil)) diff --git a/network/p2p/validators.go b/network/p2p/validators.go new file mode 100644 index 00000000000..0960a292242 --- /dev/null +++ b/network/p2p/validators.go @@ -0,0 +1,75 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "sync" + "time" + + "golang.org/x/exp/maps" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/sampler" +) + +const maxValidatorSetStaleness = time.Minute + +var _ NodeSampler = (*Validators)(nil) + +func NewValidators(subnetID ids.ID, validators validators.State) *Validators { + return &Validators{ + subnetID: subnetID, + validators: validators, + maxValidatorSetStaleness: maxValidatorSetStaleness, + } +} + +// Validators contains a set of nodes that are staking. +type Validators struct { + subnetID ids.ID + validators validators.State + + lock sync.Mutex + validatorIDs []ids.NodeID + lastUpdated time.Time + maxValidatorSetStaleness time.Duration +} + +func (v *Validators) refresh(ctx context.Context) { + if time.Since(v.lastUpdated) < v.maxValidatorSetStaleness { + return + } + + height, err := v.validators.GetCurrentHeight(ctx) + if err != nil { + return + } + validatorSet, err := v.validators.GetValidatorSet(ctx, height, v.subnetID) + if err != nil { + return + } + + v.validatorIDs = maps.Keys(validatorSet) +} + +func (v *Validators) Sample(ctx context.Context, limit int) []ids.NodeID { + v.lock.Lock() + defer v.lock.Unlock() + + v.refresh(ctx) + + s := sampler.NewUniform() + s.Initialize(uint64(len(v.validatorIDs))) + + indices, _ := s.Sample(math.Min(len(v.validatorIDs), limit)) + sampled := make([]ids.NodeID, 0, len(indices)) + for _, i := range indices { + sampled = append(sampled, v.validatorIDs[i]) + } + + return sampled +} diff --git a/network/p2p/validators_test.go b/network/p2p/validators_test.go new file mode 100644 index 00000000000..f22b450c88c --- /dev/null +++ b/network/p2p/validators_test.go @@ -0,0 +1,144 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + + "go.uber.org/mock/gomock" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/math" +) + +// Sample should always return up to [limit] validators, and less if fewer than +// [limit] validators are available. +func TestValidatorsSample(t *testing.T) { + tests := []struct { + name string + limit int + + currentHeight uint64 + getCurrentHeightErr error + + validatorSet map[ids.NodeID]*validators.GetValidatorOutput + getValidatorSetErr error + }{ + { + name: "less than limit validators", + limit: 2, + validatorSet: map[ids.NodeID]*validators.GetValidatorOutput{ + ids.GenerateTestNodeID(): nil, + }, + }, + { + name: "equal to limit validators", + limit: 2, + validatorSet: map[ids.NodeID]*validators.GetValidatorOutput{ + ids.GenerateTestNodeID(): nil, + ids.GenerateTestNodeID(): nil, + }, + }, + { + name: "greater than limit validators", + limit: 2, + validatorSet: map[ids.NodeID]*validators.GetValidatorOutput{ + ids.GenerateTestNodeID(): nil, + ids.GenerateTestNodeID(): nil, + ids.GenerateTestNodeID(): nil, + }, + }, + { + name: "fail to get current height", + limit: 2, + getCurrentHeightErr: errors.New("foobar"), + }, + { + name: "fail to get validator set", + limit: 2, + getCurrentHeightErr: errors.New("foobar"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + subnetID := ids.GenerateTestID() + + mockValidators := validators.NewMockState(ctrl) + mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(tt.currentHeight, tt.getCurrentHeightErr) + + getValidatorSetExpectedCalls := 0 + if tt.getCurrentHeightErr == nil { + getValidatorSetExpectedCalls = 1 + } + mockValidators.EXPECT().GetValidatorSet(gomock.Any(), tt.currentHeight, subnetID).Return(tt.validatorSet, tt.getValidatorSetErr).Times(getValidatorSetExpectedCalls) + + v := NewValidators(subnetID, mockValidators) + + sampled := v.Sample(context.Background(), tt.limit) + require.Len(sampled, math.Min(tt.limit, len(tt.validatorSet))) + require.Subset(maps.Keys(tt.validatorSet), sampled) + }) + } +} + +// invariant: we should only call GetValidatorSet when it passes a max staleness +// threshold +func TestValidatorsSampleCaching(t *testing.T) { + tests := []struct { + name string + validators []ids.NodeID + maxStaleness time.Duration + elapsed time.Duration + expectedCalls int + }{ + { + name: "within max threshold", + validators: []ids.NodeID{ids.GenerateTestNodeID()}, + maxStaleness: time.Hour, + elapsed: time.Second, + }, + { + name: "beyond max threshold", + validators: []ids.NodeID{ids.GenerateTestNodeID()}, + maxStaleness: time.Hour, + elapsed: time.Hour + 1, + expectedCalls: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + subnetID := ids.GenerateTestID() + + height := uint64(1234) + mockValidators := validators.NewMockState(ctrl) + mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(height, nil).Times(tt.expectedCalls) + + validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) + for _, validator := range tt.validators { + validatorSet[validator] = nil + } + mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil).Times(tt.expectedCalls) + + v := NewValidators(subnetID, mockValidators) + v.maxValidatorSetStaleness = tt.maxStaleness + v.lastUpdated = time.Now().Add(-tt.elapsed) + + v.Sample(context.Background(), 1) + }) + } +} From b8324d12ab6630e1e5da3b9bbb5ce83ebcc26963 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 23:01:09 -0400 Subject: [PATCH 12/22] nits --- network/p2p/peers_test.go | 1 + network/p2p/router_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/network/p2p/peers_test.go b/network/p2p/peers_test.go index ba428971cb0..48c62f73a89 100644 --- a/network/p2p/peers_test.go +++ b/network/p2p/peers_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "github.com/ava-labs/avalanchego/ids" diff --git a/network/p2p/router_test.go b/network/p2p/router_test.go index 94df6ba1d6e..552e7cbf510 100644 --- a/network/p2p/router_test.go +++ b/network/p2p/router_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "github.com/ava-labs/avalanchego/ids" From 2aa0d20c62f0e43a46367fc1d988118995a59244 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 23:03:11 -0400 Subject: [PATCH 13/22] nit --- network/p2p/validators_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/network/p2p/validators_test.go b/network/p2p/validators_test.go index f22b450c88c..a11131b93df 100644 --- a/network/p2p/validators_test.go +++ b/network/p2p/validators_test.go @@ -10,9 +10,8 @@ import ( "time" "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" - "go.uber.org/mock/gomock" + "golang.org/x/exp/maps" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" From 1809affe9a2479e6a48fb463b30fd63d6c685bce Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 23:29:57 -0400 Subject: [PATCH 14/22] nit --- network/p2p/peers_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/p2p/peers_test.go b/network/p2p/peers_test.go index 48c62f73a89..98e8b427f92 100644 --- a/network/p2p/peers_test.go +++ b/network/p2p/peers_test.go @@ -141,7 +141,8 @@ func TestAppRequestAnyNodeSelection(t *testing.T) { client, err := r.RegisterAppProtocol(1, nil, peers) require.NoError(err) - require.ErrorIs(client.AppRequestAny(context.Background(), []byte("foobar"), nil), tt.expected) + err = client.AppRequestAny(context.Background(), []byte("foobar"), nil) + require.ErrorIs(err, ErrNoPeers) }) } } From da20c257d37f7e1cc3850f78163c2c322412ca29 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 21 Aug 2023 23:40:14 -0400 Subject: [PATCH 15/22] lol --- network/p2p/peers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/peers_test.go b/network/p2p/peers_test.go index 98e8b427f92..e947a5e6b36 100644 --- a/network/p2p/peers_test.go +++ b/network/p2p/peers_test.go @@ -142,7 +142,7 @@ func TestAppRequestAnyNodeSelection(t *testing.T) { require.NoError(err) err = client.AppRequestAny(context.Background(), []byte("foobar"), nil) - require.ErrorIs(err, ErrNoPeers) + require.ErrorIs(err, tt.expected) }) } } From 956efba9a46b58e8cc41a84ce844dcc8cc060741 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 22 Aug 2023 00:06:38 -0400 Subject: [PATCH 16/22] nit --- network/p2p/node_sampler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/node_sampler.go b/network/p2p/node_sampler.go index cab7a4d24fe..057a175027a 100644 --- a/network/p2p/node_sampler.go +++ b/network/p2p/node_sampler.go @@ -9,7 +9,7 @@ import ( "github.com/ava-labs/avalanchego/ids" ) -// NodeSampler samples members of the p2p network. +// NodeSampler samples nodes in network type NodeSampler interface { // Sample returns at most [limit] nodes. This may return fewer nodes if // fewer than [limit] are available. From 868893bca292f977c7f6796fa57749b128659848 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 22 Aug 2023 01:17:22 -0400 Subject: [PATCH 17/22] nit --- network/p2p/validators.go | 1 + 1 file changed, 1 insertion(+) diff --git a/network/p2p/validators.go b/network/p2p/validators.go index 0960a292242..28e484a8300 100644 --- a/network/p2p/validators.go +++ b/network/p2p/validators.go @@ -54,6 +54,7 @@ func (v *Validators) refresh(ctx context.Context) { } v.validatorIDs = maps.Keys(validatorSet) + v.lastUpdated = time.Now() } func (v *Validators) Sample(ctx context.Context, limit int) []ids.NodeID { From ab45557707038b477c3083e06caef4c51cc203fd Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 22 Aug 2023 02:23:10 -0400 Subject: [PATCH 18/22] nit --- network/p2p/validators.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/network/p2p/validators.go b/network/p2p/validators.go index 28e484a8300..0248bd17d51 100644 --- a/network/p2p/validators.go +++ b/network/p2p/validators.go @@ -8,12 +8,9 @@ import ( "sync" "time" - "golang.org/x/exp/maps" - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" - "github.com/ava-labs/avalanchego/utils/math" - "github.com/ava-labs/avalanchego/utils/sampler" + "github.com/ava-labs/avalanchego/utils/set" ) const maxValidatorSetStaleness = time.Minute @@ -34,7 +31,7 @@ type Validators struct { validators validators.State lock sync.Mutex - validatorIDs []ids.NodeID + validatorIDs set.SampleableSet[ids.NodeID] lastUpdated time.Time maxValidatorSetStaleness time.Duration } @@ -53,7 +50,10 @@ func (v *Validators) refresh(ctx context.Context) { return } - v.validatorIDs = maps.Keys(validatorSet) + v.validatorIDs = set.NewSampleableSet[ids.NodeID](len(validatorSet)) + for nodeID := range validatorSet { + v.validatorIDs.Add(nodeID) + } v.lastUpdated = time.Now() } @@ -63,14 +63,14 @@ func (v *Validators) Sample(ctx context.Context, limit int) []ids.NodeID { v.refresh(ctx) - s := sampler.NewUniform() - s.Initialize(uint64(len(v.validatorIDs))) + return v.validatorIDs.Sample(limit) +} - indices, _ := s.Sample(math.Min(len(v.validatorIDs), limit)) - sampled := make([]ids.NodeID, 0, len(indices)) - for _, i := range indices { - sampled = append(sampled, v.validatorIDs[i]) - } +func (v *Validators) Has(ctx context.Context, nodeID ids.NodeID) bool { + v.lock.Lock() + defer v.lock.Unlock() + + v.refresh(ctx) - return sampled + return v.validatorIDs.Contains(nodeID) } From 0187c3423afcfdb3a79bb432880365b894d11f9c Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Wed, 23 Aug 2023 11:13:26 -0400 Subject: [PATCH 19/22] refactor unit test --- network/p2p/validators_test.go | 218 ++++++++++++++++++++------------- 1 file changed, 131 insertions(+), 87 deletions(-) diff --git a/network/p2p/validators_test.go b/network/p2p/validators_test.go index a11131b93df..c5584059617 100644 --- a/network/p2p/validators_test.go +++ b/network/p2p/validators_test.go @@ -11,133 +11,177 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "golang.org/x/exp/maps" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" - "github.com/ava-labs/avalanchego/utils/math" ) -// Sample should always return up to [limit] validators, and less if fewer than -// [limit] validators are available. func TestValidatorsSample(t *testing.T) { - tests := []struct { - name string + errFoobar := errors.New("foobar") + nodeID1 := ids.GenerateTestNodeID() + nodeID2 := ids.GenerateTestNodeID() + + type call struct { limit int - currentHeight uint64 + time time.Time + + height uint64 getCurrentHeightErr error - validatorSet map[ids.NodeID]*validators.GetValidatorOutput + validators []ids.NodeID getValidatorSetErr error + + // superset of possible values in the result + expected []ids.NodeID + } + + tests := []struct { + name string + maxStaleness time.Duration + calls []call }{ { - name: "less than limit validators", - limit: 2, - validatorSet: map[ids.NodeID]*validators.GetValidatorOutput{ - ids.GenerateTestNodeID(): nil, + // if we don't have as many validators as requested by the caller, + // we should return all the validators we have + name: "less than limit validators", + maxStaleness: time.Hour, + calls: []call{ + { + time: time.Time{}.Add(time.Second), + limit: 2, + height: 1, + validators: []ids.NodeID{nodeID1}, + expected: []ids.NodeID{nodeID1}, + }, }, }, { - name: "equal to limit validators", - limit: 2, - validatorSet: map[ids.NodeID]*validators.GetValidatorOutput{ - ids.GenerateTestNodeID(): nil, - ids.GenerateTestNodeID(): nil, + // if we have as many validators as requested by the caller, we + // should return all the validators we have + name: "equal to limit validators", + maxStaleness: time.Hour, + calls: []call{ + { + time: time.Time{}.Add(time.Second), + limit: 1, + height: 1, + validators: []ids.NodeID{nodeID1}, + expected: []ids.NodeID{nodeID1}, + }, }, }, { - name: "greater than limit validators", - limit: 2, - validatorSet: map[ids.NodeID]*validators.GetValidatorOutput{ - ids.GenerateTestNodeID(): nil, - ids.GenerateTestNodeID(): nil, - ids.GenerateTestNodeID(): nil, + // if we have less validators than requested by the caller, we + // should return a subset of the validators that we have + name: "less than limit validators", + maxStaleness: time.Hour, + calls: []call{ + { + time: time.Time{}.Add(time.Second), + limit: 1, + height: 1, + validators: []ids.NodeID{nodeID1, nodeID2}, + expected: []ids.NodeID{nodeID1, nodeID2}, + }, }, }, { - name: "fail to get current height", - limit: 2, - getCurrentHeightErr: errors.New("foobar"), + name: "within max staleness threshold", + maxStaleness: time.Hour, + calls: []call{ + { + time: time.Time{}.Add(time.Second), + limit: 1, + height: 1, + validators: []ids.NodeID{nodeID1}, + expected: []ids.NodeID{nodeID1}, + }, + }, }, { - name: "fail to get validator set", - limit: 2, - getCurrentHeightErr: errors.New("foobar"), + name: "beyond max staleness threshold", + maxStaleness: time.Hour, + calls: []call{ + { + limit: 1, + time: time.Time{}.Add(time.Hour), + height: 1, + validators: []ids.NodeID{nodeID1}, + expected: []ids.NodeID{nodeID1}, + }, + }, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - ctrl := gomock.NewController(t) - - subnetID := ids.GenerateTestID() - - mockValidators := validators.NewMockState(ctrl) - mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(tt.currentHeight, tt.getCurrentHeightErr) - - getValidatorSetExpectedCalls := 0 - if tt.getCurrentHeightErr == nil { - getValidatorSetExpectedCalls = 1 - } - mockValidators.EXPECT().GetValidatorSet(gomock.Any(), tt.currentHeight, subnetID).Return(tt.validatorSet, tt.getValidatorSetErr).Times(getValidatorSetExpectedCalls) - - v := NewValidators(subnetID, mockValidators) - - sampled := v.Sample(context.Background(), tt.limit) - require.Len(sampled, math.Min(tt.limit, len(tt.validatorSet))) - require.Subset(maps.Keys(tt.validatorSet), sampled) - }) - } -} - -// invariant: we should only call GetValidatorSet when it passes a max staleness -// threshold -func TestValidatorsSampleCaching(t *testing.T) { - tests := []struct { - name string - validators []ids.NodeID - maxStaleness time.Duration - elapsed time.Duration - expectedCalls int - }{ { - name: "within max threshold", - validators: []ids.NodeID{ids.GenerateTestNodeID()}, - maxStaleness: time.Hour, - elapsed: time.Second, + name: "fail to get current height", + maxStaleness: time.Second, + calls: []call{ + { + limit: 1, + time: time.Time{}.Add(time.Hour), + getCurrentHeightErr: errFoobar, + expected: []ids.NodeID{}, + }, + }, }, { - name: "beyond max threshold", - validators: []ids.NodeID{ids.GenerateTestNodeID()}, - maxStaleness: time.Hour, - elapsed: time.Hour + 1, - expectedCalls: 1, + name: "second get validator set call fails", + maxStaleness: time.Minute, + calls: []call{ + { + limit: 1, + time: time.Time{}.Add(time.Second), + height: 1, + validators: []ids.NodeID{nodeID1}, + expected: []ids.NodeID{nodeID1}, + }, + { + limit: 1, + time: time.Time{}.Add(time.Hour), + height: 1, + getValidatorSetErr: errFoobar, + expected: []ids.NodeID{}, + }, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + require := require.New(t) ctrl := gomock.NewController(t) subnetID := ids.GenerateTestID() - - height := uint64(1234) mockValidators := validators.NewMockState(ctrl) - mockValidators.EXPECT().GetCurrentHeight(gomock.Any()).Return(height, nil).Times(tt.expectedCalls) - validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) - for _, validator := range tt.validators { - validatorSet[validator] = nil - } - mockValidators.EXPECT().GetValidatorSet(gomock.Any(), height, subnetID).Return(validatorSet, nil).Times(tt.expectedCalls) + calls := make([]*gomock.Call, 0) + for _, call := range tt.calls { + calls = append(calls, mockValidators.EXPECT(). + GetCurrentHeight(gomock.Any()).Return(call.height, call.getCurrentHeightErr)) + + if call.getCurrentHeightErr != nil { + continue + } - v := NewValidators(subnetID, mockValidators) - v.maxValidatorSetStaleness = tt.maxStaleness - v.lastUpdated = time.Now().Add(-tt.elapsed) + validatorSet := make(map[ids.NodeID]*validators.GetValidatorOutput, 0) + for _, validator := range call.validators { + validatorSet[validator] = nil + } - v.Sample(context.Background(), 1) + calls = append(calls, + mockValidators.EXPECT(). + GetValidatorSet(gomock.Any(), gomock.Any(), subnetID). + Return(validatorSet, call.getValidatorSetErr)) + } + gomock.InOrder(calls...) + + v := NewValidators(subnetID, mockValidators, tt.maxStaleness) + for _, call := range tt.calls { + v.lastUpdated = call.time + sampled := v.Sample(context.Background(), call.limit) + require.LessOrEqual(len(sampled), call.limit) + require.Subset(call.expected, sampled) + } }) } } From 0ae66591f1360c283d56e0c44b68d48916e8daa0 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Wed, 23 Aug 2023 11:13:42 -0400 Subject: [PATCH 20/22] nits --- network/p2p/client.go | 3 ++- network/p2p/validators.go | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/network/p2p/client.go b/network/p2p/client.go index 4fae1cbe53f..3c20988e128 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -41,7 +41,8 @@ type Client struct { handlerPrefix []byte router *Router sender common.AppSender - nodeSampler NodeSampler + // nodeSampler is used to select nodes to route AppRequestAny to + nodeSampler NodeSampler } // AppRequestAny issues an AppRequest to an arbitrary node decided by Client. diff --git a/network/p2p/validators.go b/network/p2p/validators.go index 0248bd17d51..830e3d93ea2 100644 --- a/network/p2p/validators.go +++ b/network/p2p/validators.go @@ -13,11 +13,9 @@ import ( "github.com/ava-labs/avalanchego/utils/set" ) -const maxValidatorSetStaleness = time.Minute - var _ NodeSampler = (*Validators)(nil) -func NewValidators(subnetID ids.ID, validators validators.State) *Validators { +func NewValidators(subnetID ids.ID, validators validators.State, maxValidatorSetStaleness time.Duration) *Validators { return &Validators{ subnetID: subnetID, validators: validators, @@ -41,6 +39,8 @@ func (v *Validators) refresh(ctx context.Context) { return } + v.validatorIDs.Clear() + height, err := v.validators.GetCurrentHeight(ctx) if err != nil { return @@ -50,10 +50,10 @@ func (v *Validators) refresh(ctx context.Context) { return } - v.validatorIDs = set.NewSampleableSet[ids.NodeID](len(validatorSet)) for nodeID := range validatorSet { v.validatorIDs.Add(nodeID) } + v.lastUpdated = time.Now() } From 386f96aefac028b3541c3382d9d80aa709b2141d Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Wed, 23 Aug 2023 12:23:23 -0400 Subject: [PATCH 21/22] nit --- network/p2p/validators.go | 9 ++++++++- network/p2p/validators_test.go | 3 ++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/network/p2p/validators.go b/network/p2p/validators.go index 830e3d93ea2..c13752b686b 100644 --- a/network/p2p/validators.go +++ b/network/p2p/validators.go @@ -8,15 +8,19 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" ) var _ NodeSampler = (*Validators)(nil) -func NewValidators(subnetID ids.ID, validators validators.State, maxValidatorSetStaleness time.Duration) *Validators { +func NewValidators(log logging.Logger, subnetID ids.ID, validators validators.State, maxValidatorSetStaleness time.Duration) *Validators { return &Validators{ + log: log, subnetID: subnetID, validators: validators, maxValidatorSetStaleness: maxValidatorSetStaleness, @@ -25,6 +29,7 @@ func NewValidators(subnetID ids.ID, validators validators.State, maxValidatorSet // Validators contains a set of nodes that are staking. type Validators struct { + log logging.Logger subnetID ids.ID validators validators.State @@ -43,10 +48,12 @@ func (v *Validators) refresh(ctx context.Context) { height, err := v.validators.GetCurrentHeight(ctx) if err != nil { + v.log.Warn("failed to get current height", zap.Error(err)) return } validatorSet, err := v.validators.GetValidatorSet(ctx, height, v.subnetID) if err != nil { + v.log.Warn("failed to get validator set", zap.Error(err)) return } diff --git a/network/p2p/validators_test.go b/network/p2p/validators_test.go index c5584059617..a72f704780a 100644 --- a/network/p2p/validators_test.go +++ b/network/p2p/validators_test.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/logging" ) func TestValidatorsSample(t *testing.T) { @@ -175,7 +176,7 @@ func TestValidatorsSample(t *testing.T) { } gomock.InOrder(calls...) - v := NewValidators(subnetID, mockValidators, tt.maxStaleness) + v := NewValidators(logging.NoLog{}, subnetID, mockValidators, tt.maxStaleness) for _, call := range tt.calls { v.lastUpdated = call.time sampled := v.Sample(context.Background(), call.limit) From af938d55e8855e05119bc982f731637df233510d Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 23 Aug 2023 14:20:56 -0400 Subject: [PATCH 22/22] Update network/p2p/validators_test.go Signed-off-by: Stephen Buttolph --- network/p2p/validators_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/network/p2p/validators_test.go b/network/p2p/validators_test.go index a72f704780a..5db06f7a2ef 100644 --- a/network/p2p/validators_test.go +++ b/network/p2p/validators_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "github.com/ava-labs/avalanchego/ids"