diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel b/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel index b70773404e90..e9ee37d4291c 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel @@ -79,12 +79,7 @@ go_test( "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/core/altair:go_default_library", "//beacon-chain/core/epoch/precompute:go_default_library", - "//beacon-chain/core/feed:go_default_library", - "//beacon-chain/core/feed/block:go_default_library", - "//beacon-chain/core/feed/operation:go_default_library", - "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", - "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", "//beacon-chain/db:go_default_library", @@ -111,14 +106,11 @@ go_test( "//encoding/bytesutil:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/attestation:go_default_library", - "//proto/prysm/v1alpha1/attestation/aggregation/attestations:go_default_library", "//testing/assert:go_default_library", - "//testing/mock:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", "//time:go_default_library", "//time/slots:go_default_library", - "@com_github_golang_mock//gomock:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@in_gopkg_d4l3k_messagediff_v1//:go_default_library", "@org_golang_google_protobuf//proto:go_default_library", diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/attestations_test.go b/beacon-chain/rpc/prysm/v1alpha1/beacon/attestations_test.go index 5da7d482a75b..be9ef9238967 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/attestations_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/attestations_test.go @@ -8,13 +8,9 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/prysmaticlabs/go-bitfield" chainMock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" dbTest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" @@ -29,14 +25,11 @@ import ( "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation" - attaggregation "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation/aggregation/attestations" "github.com/prysmaticlabs/prysm/v5/testing/assert" - "github.com/prysmaticlabs/prysm/v5/testing/mock" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" "github.com/prysmaticlabs/prysm/v5/time/slots" "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/emptypb" ) func TestServer_ListAttestations_NoResults(t *testing.T) { @@ -824,239 +817,3 @@ func TestServer_AttestationPool_Pagination_CustomPageSize(t *testing.T) { assert.Equal(t, tt.res.NextPageToken, res.NextPageToken, "Unexpected next page token") } } - -func TestServer_StreamIndexedAttestations_ContextCanceled(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - chainService := &chainMock.ChainService{} - server := &Server{ - Ctx: ctx, - AttestationNotifier: chainService.OperationNotifier(), - GenesisTimeFetcher: &chainMock.ChainService{ - Genesis: time.Now(), - }, - } - - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamIndexedAttestationsServer(ctrl) - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - go func(tt *testing.T) { - err := server.StreamIndexedAttestations(&emptypb.Empty{}, mockStream) - assert.ErrorContains(t, "Context canceled", err) - <-exitRoutine - }(t) - cancel() - exitRoutine <- true -} - -func TestServer_StreamIndexedAttestations_OK(t *testing.T) { - params.SetupTestConfigCleanup(t) - params.OverrideBeaconConfig(params.BeaconConfig()) - db := dbTest.SetupDB(t) - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - ctx := context.Background() - - numValidators := 64 - headState, privKeys := util.DeterministicGenesisState(t, uint64(numValidators)) - b := util.NewBeaconBlock() - util.SaveBlock(t, ctx, db, b) - gRoot, err := b.Block.HashTreeRoot() - require.NoError(t, err) - require.NoError(t, db.SaveGenesisBlockRoot(ctx, gRoot)) - require.NoError(t, db.SaveState(ctx, headState, gRoot)) - - activeIndices, err := helpers.ActiveValidatorIndices(ctx, headState, 0) - require.NoError(t, err) - epoch := primitives.Epoch(0) - attesterSeed, err := helpers.Seed(headState, epoch, params.BeaconConfig().DomainBeaconAttester) - require.NoError(t, err) - committees, err := computeCommittees(context.Background(), params.BeaconConfig().SlotsPerEpoch.Mul(uint64(epoch)), activeIndices, attesterSeed) - require.NoError(t, err) - - count := params.BeaconConfig().SlotsPerEpoch - // We generate attestations for each validator per slot per epoch. - atts := make(map[[32]byte][]*ethpb.Attestation) - for i := primitives.Slot(0); i < count; i++ { - comms := committees[i].Committees - for j := 0; j < numValidators; j++ { - var indexInCommittee uint64 - var committeeIndex primitives.CommitteeIndex - var committeeLength int - var found bool - for comIndex, item := range comms { - for n, idx := range item.ValidatorIndices { - if primitives.ValidatorIndex(j) == idx { - indexInCommittee = uint64(n) - committeeIndex = primitives.CommitteeIndex(comIndex) - committeeLength = len(item.ValidatorIndices) - found = true - break - } - } - } - if !found { - continue - } - attExample := ðpb.Attestation{ - Data: ðpb.AttestationData{ - BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32), - Slot: i, - Source: ðpb.Checkpoint{ - Epoch: 0, - Root: gRoot[:], - }, - Target: ðpb.Checkpoint{ - Epoch: 0, - Root: gRoot[:], - }, - }, - } - domain, err := signing.Domain(headState.Fork(), 0, params.BeaconConfig().DomainBeaconAttester, headState.GenesisValidatorsRoot()) - require.NoError(t, err) - encoded, err := signing.ComputeSigningRoot(attExample.Data, domain) - require.NoError(t, err) - sig := privKeys[j].Sign(encoded[:]) - attExample.Signature = sig.Marshal() - attExample.Data.CommitteeIndex = committeeIndex - aggregationBitfield := bitfield.NewBitlist(uint64(committeeLength)) - aggregationBitfield.SetBitAt(indexInCommittee, true) - attExample.AggregationBits = aggregationBitfield - atts[encoded] = append(atts[encoded], attExample) - } - } - - chainService := &chainMock.ChainService{} - server := &Server{ - BeaconDB: db, - Ctx: context.Background(), - HeadFetcher: &chainMock.ChainService{ - State: headState, - }, - GenesisTimeFetcher: &chainMock.ChainService{ - Genesis: time.Now(), - }, - AttestationNotifier: chainService.OperationNotifier(), - CollectedAttestationsBuffer: make(chan []*ethpb.Attestation, 1), - StateGen: stategen.New(db, doublylinkedtree.New()), - } - - for dataRoot, sameDataAtts := range atts { - aggAtts, err := attaggregation.Aggregate(sameDataAtts) - require.NoError(t, err) - atts[dataRoot] = aggAtts - } - - // Next up we convert the test attestations to indexed form. - attsByTarget := make(map[[32]byte][]*ethpb.Attestation) - for _, dataRootAtts := range atts { - targetRoot := bytesutil.ToBytes32(dataRootAtts[0].Data.Target.Root) - attsByTarget[targetRoot] = append(attsByTarget[targetRoot], dataRootAtts...) - } - - allAtts := make([]*ethpb.Attestation, 0) - indexedAtts := make(map[[32]byte][]*ethpb.IndexedAttestation) - for dataRoot, aggAtts := range attsByTarget { - allAtts = append(allAtts, aggAtts...) - for _, att := range aggAtts { - committee := committees[att.Data.Slot].Committees[att.Data.CommitteeIndex] - idxAtt, err := attestation.ConvertToIndexed(ctx, att, committee.ValidatorIndices) - require.NoError(t, err) - indexedAtts[dataRoot] = append(indexedAtts[dataRoot], idxAtt) - } - } - - attsSent := 0 - mockStream := mock.NewMockBeaconChain_StreamIndexedAttestationsServer(ctrl) - for _, atts := range indexedAtts { - for _, att := range atts { - if attsSent == len(allAtts)-1 { - mockStream.EXPECT().Send(att).Do(func(arg0 interface{}) { - exitRoutine <- true - }) - t.Log("cancelled") - } else { - mockStream.EXPECT().Send(att) - attsSent++ - } - } - } - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - - go func(tt *testing.T) { - assert.NoError(tt, server.StreamIndexedAttestations(&emptypb.Empty{}, mockStream), "Could not call RPC method") - }(t) - - server.CollectedAttestationsBuffer <- allAtts - <-exitRoutine -} - -func TestServer_StreamAttestations_ContextCanceled(t *testing.T) { - ctx := context.Background() - - ctx, cancel := context.WithCancel(ctx) - chainService := &chainMock.ChainService{} - server := &Server{ - Ctx: ctx, - AttestationNotifier: chainService.OperationNotifier(), - } - - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamAttestationsServer(ctrl) - mockStream.EXPECT().Context().Return(ctx) - go func(tt *testing.T) { - err := server.StreamAttestations( - &emptypb.Empty{}, - mockStream, - ) - assert.ErrorContains(tt, "Context canceled", err) - <-exitRoutine - }(t) - cancel() - exitRoutine <- true -} - -func TestServer_StreamAttestations_OnSlotTick(t *testing.T) { - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - ctx := context.Background() - chainService := &chainMock.ChainService{} - server := &Server{ - Ctx: ctx, - AttestationNotifier: chainService.OperationNotifier(), - } - - atts := []*ethpb.Attestation{ - util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}}), - util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}}), - util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}}), - } - - mockStream := mock.NewMockBeaconChain_StreamAttestationsServer(ctrl) - mockStream.EXPECT().Send(atts[0]) - mockStream.EXPECT().Send(atts[1]) - mockStream.EXPECT().Send(atts[2]).Do(func(arg0 interface{}) { - exitRoutine <- true - }) - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - - go func(tt *testing.T) { - assert.NoError(tt, server.StreamAttestations(&emptypb.Empty{}, mockStream), "Could not call RPC method") - }(t) - for i := 0; i < len(atts); i++ { - // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). - for sent := 0; sent == 0; { - sent = server.AttestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{Attestation: atts[i]}, - }) - } - } - <-exitRoutine -} diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go b/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go index 9f8310024281..613e8d26e7d7 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go @@ -6,14 +6,9 @@ import ( "strconv" "testing" - "github.com/golang/mock/gomock" chainMock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" - blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block" - statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" dbTest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" state_native "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/state-native" - mockSync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/initial-sync/testing" "github.com/prysmaticlabs/prysm/v5/config/features" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" @@ -23,12 +18,10 @@ import ( "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/assert" - "github.com/prysmaticlabs/prysm/v5/testing/mock" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" "github.com/prysmaticlabs/prysm/v5/time/slots" "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/emptypb" ) // ensures that if any of the checkpoints are zero-valued, an error will be generated without genesis being present @@ -202,271 +195,6 @@ func TestServer_GetChainHead(t *testing.T) { assert.Equal(t, false, head.OptimisticStatus) } -func TestServer_StreamChainHead_ContextCanceled(t *testing.T) { - db := dbTest.SetupDB(t) - ctx := context.Background() - - ctx, cancel := context.WithCancel(ctx) - chainService := &chainMock.ChainService{} - server := &Server{ - Ctx: ctx, - StateNotifier: chainService.StateNotifier(), - BeaconDB: db, - } - - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamChainHeadServer(ctrl) - mockStream.EXPECT().Context().Return(ctx) - go func(tt *testing.T) { - assert.ErrorContains(tt, "Context canceled", server.StreamChainHead(&emptypb.Empty{}, mockStream)) - <-exitRoutine - }(t) - cancel() - exitRoutine <- true -} - -func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { - params.SetupTestConfigCleanup(t) - params.OverrideBeaconConfig(params.MainnetConfig()) - db := dbTest.SetupDB(t) - genBlock := util.NewBeaconBlock() - genBlock.Block.ParentRoot = bytesutil.PadTo([]byte{'G'}, fieldparams.RootLength) - util.SaveBlock(t, context.Background(), db, genBlock) - gRoot, err := genBlock.Block.HashTreeRoot() - require.NoError(t, err) - require.NoError(t, db.SaveGenesisBlockRoot(context.Background(), gRoot)) - - finalizedBlock := util.NewBeaconBlock() - finalizedBlock.Block.Slot = 32 - finalizedBlock.Block.ParentRoot = bytesutil.PadTo([]byte{'A'}, fieldparams.RootLength) - util.SaveBlock(t, context.Background(), db, finalizedBlock) - fRoot, err := finalizedBlock.Block.HashTreeRoot() - require.NoError(t, err) - - justifiedBlock := util.NewBeaconBlock() - justifiedBlock.Block.Slot = 64 - justifiedBlock.Block.ParentRoot = bytesutil.PadTo([]byte{'B'}, fieldparams.RootLength) - util.SaveBlock(t, context.Background(), db, justifiedBlock) - jRoot, err := justifiedBlock.Block.HashTreeRoot() - require.NoError(t, err) - - prevJustifiedBlock := util.NewBeaconBlock() - prevJustifiedBlock.Block.Slot = 96 - prevJustifiedBlock.Block.ParentRoot = bytesutil.PadTo([]byte{'C'}, fieldparams.RootLength) - util.SaveBlock(t, context.Background(), db, prevJustifiedBlock) - pjRoot, err := prevJustifiedBlock.Block.HashTreeRoot() - require.NoError(t, err) - - s, err := state_native.InitializeFromProtoPhase0(ðpb.BeaconState{ - Slot: 1, - PreviousJustifiedCheckpoint: ðpb.Checkpoint{Epoch: 3, Root: pjRoot[:]}, - CurrentJustifiedCheckpoint: ðpb.Checkpoint{Epoch: 2, Root: jRoot[:]}, - FinalizedCheckpoint: ðpb.Checkpoint{Epoch: 1, Root: fRoot[:]}, - }) - require.NoError(t, err) - - b := util.NewBeaconBlock() - b.Block.Slot, err = slots.EpochStart(s.PreviousJustifiedCheckpoint().Epoch) - require.NoError(t, err) - - hRoot, err := b.Block.HashTreeRoot() - require.NoError(t, err) - - chainService := &chainMock.ChainService{} - ctx := context.Background() - wsb, err := blocks.NewSignedBeaconBlock(b) - require.NoError(t, err) - server := &Server{ - Ctx: ctx, - HeadFetcher: &chainMock.ChainService{Block: wsb, State: s}, - BeaconDB: db, - StateNotifier: chainService.StateNotifier(), - FinalizationFetcher: &chainMock.ChainService{ - FinalizedCheckPoint: s.FinalizedCheckpoint(), - CurrentJustifiedCheckPoint: s.CurrentJustifiedCheckpoint(), - PreviousJustifiedCheckPoint: s.PreviousJustifiedCheckpoint()}, - OptimisticModeFetcher: &chainMock.ChainService{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - } - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamChainHeadServer(ctrl) - mockStream.EXPECT().Send( - ðpb.ChainHead{ - HeadSlot: b.Block.Slot, - HeadEpoch: slots.ToEpoch(b.Block.Slot), - HeadBlockRoot: hRoot[:], - FinalizedSlot: 32, - FinalizedEpoch: 1, - FinalizedBlockRoot: fRoot[:], - JustifiedSlot: 64, - JustifiedEpoch: 2, - JustifiedBlockRoot: jRoot[:], - PreviousJustifiedSlot: 96, - PreviousJustifiedEpoch: 3, - PreviousJustifiedBlockRoot: pjRoot[:], - }, - ).Do(func(arg0 interface{}) { - exitRoutine <- true - }) - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - - go func(tt *testing.T) { - assert.NoError(tt, server.StreamChainHead(&emptypb.Empty{}, mockStream), "Could not call RPC method") - }(t) - - // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). - for sent := 0; sent == 0; { - sent = server.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.BlockProcessed, - Data: &statefeed.BlockProcessedData{}, - }) - } - <-exitRoutine -} - -func TestServer_StreamBlocksVerified_ContextCanceled(t *testing.T) { - db := dbTest.SetupDB(t) - ctx := context.Background() - - chainService := &chainMock.ChainService{} - ctx, cancel := context.WithCancel(ctx) - server := &Server{ - Ctx: ctx, - StateNotifier: chainService.StateNotifier(), - HeadFetcher: chainService, - BeaconDB: db, - } - - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamBlocksServer(ctrl) - mockStream.EXPECT().Context().Return(ctx) - go func(tt *testing.T) { - assert.ErrorContains(tt, "Context canceled", server.StreamBlocks(ðpb.StreamBlocksRequest{ - VerifiedOnly: true, - }, mockStream)) - <-exitRoutine - }(t) - cancel() - exitRoutine <- true -} - -func TestServer_StreamBlocks_ContextCanceled(t *testing.T) { - db := dbTest.SetupDB(t) - ctx := context.Background() - - chainService := &chainMock.ChainService{} - ctx, cancel := context.WithCancel(ctx) - server := &Server{ - Ctx: ctx, - BlockNotifier: chainService.BlockNotifier(), - HeadFetcher: chainService, - BeaconDB: db, - } - - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamBlocksServer(ctrl) - mockStream.EXPECT().Context().Return(ctx) - go func(tt *testing.T) { - assert.ErrorContains(tt, "Context canceled", server.StreamBlocks(ðpb.StreamBlocksRequest{}, mockStream)) - <-exitRoutine - }(t) - cancel() - exitRoutine <- true -} - -func TestServer_StreamBlocks_OnHeadUpdated(t *testing.T) { - params.SetupTestConfigCleanup(t) - params.OverrideBeaconConfig(params.BeaconConfig()) - - ctx := context.Background() - beaconState, privs := util.DeterministicGenesisState(t, 32) - b, err := util.GenerateFullBlock(beaconState, privs, util.DefaultBlockGenConfig(), 1) - require.NoError(t, err) - chainService := &chainMock.ChainService{State: beaconState} - server := &Server{ - Ctx: ctx, - BlockNotifier: chainService.BlockNotifier(), - HeadFetcher: chainService, - } - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamBlocksServer(ctrl) - mockStream.EXPECT().Send(b).Do(func(arg0 interface{}) { - exitRoutine <- true - }) - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - - go func(tt *testing.T) { - assert.NoError(tt, server.StreamBlocks(ðpb.StreamBlocksRequest{}, mockStream), "Could not call RPC method") - }(t) - - // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). - for sent := 0; sent == 0; { - wsb, err := blocks.NewSignedBeaconBlock(b) - require.NoError(t, err) - sent = server.BlockNotifier.BlockFeed().Send(&feed.Event{ - Type: blockfeed.ReceivedBlock, - Data: &blockfeed.ReceivedBlockData{SignedBlock: wsb}, - }) - } - <-exitRoutine -} - -func TestServer_StreamBlocksVerified_OnHeadUpdated(t *testing.T) { - params.SetupTestConfigCleanup(t) - params.OverrideBeaconConfig(params.BeaconConfig()) - - db := dbTest.SetupDB(t) - ctx := context.Background() - beaconState, privs := util.DeterministicGenesisState(t, 32) - b, err := util.GenerateFullBlock(beaconState, privs, util.DefaultBlockGenConfig(), 1) - require.NoError(t, err) - r, err := b.Block.HashTreeRoot() - require.NoError(t, err) - util.SaveBlock(t, ctx, db, b) - chainService := &chainMock.ChainService{State: beaconState} - server := &Server{ - Ctx: ctx, - StateNotifier: chainService.StateNotifier(), - HeadFetcher: chainService, - BeaconDB: db, - } - exitRoutine := make(chan bool) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconChain_StreamBlocksServer(ctrl) - mockStream.EXPECT().Send(b).Do(func(arg0 interface{}) { - exitRoutine <- true - }) - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - - go func(tt *testing.T) { - assert.NoError(tt, server.StreamBlocks(ðpb.StreamBlocksRequest{ - VerifiedOnly: true, - }, mockStream), "Could not call RPC method") - }(t) - - // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). - for sent := 0; sent == 0; { - wsb, err := blocks.NewSignedBeaconBlock(b) - require.NoError(t, err) - sent = server.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.BlockProcessed, - Data: &statefeed.BlockProcessedData{Slot: b.Block.Slot, BlockRoot: r, SignedBlock: wsb}, - }) - } - <-exitRoutine -} - func TestServer_ListBeaconBlocks_NoResults(t *testing.T) { db := dbTest.SetupDB(t) ctx := context.Background() diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/duties_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/duties_test.go index 68a38ec51782..367ecaf72673 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/duties_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/duties_test.go @@ -6,14 +6,11 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" mockChain "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache/depositcache" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/execution" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" - statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" mockExecution "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing" @@ -22,10 +19,8 @@ import ( "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" - ethpbv1 "github.com/prysmaticlabs/prysm/v5/proto/eth/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/assert" - "github.com/prysmaticlabs/prysm/v5/testing/mock" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" ) @@ -468,141 +463,6 @@ func TestGetDuties_SyncNotReady(t *testing.T) { assert.ErrorContains(t, "Syncing to latest head", err) } -func TestStreamDuties_SyncNotReady(t *testing.T) { - vs := &Server{ - SyncChecker: &mockSync.Sync{IsSyncing: true}, - } - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockStream := mock.NewMockBeaconNodeValidator_StreamDutiesServer(ctrl) - assert.ErrorContains(t, "Syncing to latest head", vs.StreamDuties(ðpb.DutiesRequest{}, mockStream)) -} - -func TestStreamDuties_OK(t *testing.T) { - genesis := util.NewBeaconBlock() - depChainStart := params.BeaconConfig().MinGenesisActiveValidatorCount - deposits, _, err := util.DeterministicDepositsAndKeys(depChainStart) - require.NoError(t, err) - eth1Data, err := util.DeterministicEth1Data(len(deposits)) - require.NoError(t, err) - bs, err := transition.GenesisBeaconState(context.Background(), deposits, 0, eth1Data) - require.NoError(t, err, "Could not setup genesis bs") - genesisRoot, err := genesis.Block.HashTreeRoot() - require.NoError(t, err, "Could not get signing root") - - pubKeys := make([][]byte, len(deposits)) - indices := make([]uint64, len(deposits)) - for i := 0; i < len(deposits); i++ { - pubKeys[i] = deposits[i].Data.PublicKey - indices[i] = uint64(i) - } - - pubkeysAs48ByteType := make([][fieldparams.BLSPubkeyLength]byte, len(pubKeys)) - for i, pk := range pubKeys { - pubkeysAs48ByteType[i] = bytesutil.ToBytes48(pk) - } - - ctx, cancel := context.WithCancel(context.Background()) - c := &mockChain.ChainService{ - Genesis: time.Now(), - } - vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, - PayloadIDCache: cache.NewPayloadIDCache(), - } - - // Test the first validator in registry. - req := ðpb.DutiesRequest{ - PublicKeys: [][]byte{deposits[0].Data.PublicKey}, - } - wantedRes, err := vs.duties(ctx, req) - require.NoError(t, err) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - exitRoutine := make(chan bool) - mockStream := mock.NewMockBeaconNodeValidator_StreamDutiesServer(ctrl) - mockStream.EXPECT().Send(wantedRes).Do(func(arg0 interface{}) { - exitRoutine <- true - }) - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - go func(tt *testing.T) { - assert.ErrorContains(t, "context canceled", vs.StreamDuties(req, mockStream)) - }(t) - <-exitRoutine - cancel() -} - -func TestStreamDuties_OK_ChainReorg(t *testing.T) { - genesis := util.NewBeaconBlock() - depChainStart := params.BeaconConfig().MinGenesisActiveValidatorCount - deposits, _, err := util.DeterministicDepositsAndKeys(depChainStart) - require.NoError(t, err) - eth1Data, err := util.DeterministicEth1Data(len(deposits)) - require.NoError(t, err) - bs, err := transition.GenesisBeaconState(context.Background(), deposits, 0, eth1Data) - require.NoError(t, err, "Could not setup genesis bs") - genesisRoot, err := genesis.Block.HashTreeRoot() - require.NoError(t, err, "Could not get signing root") - - pubKeys := make([][]byte, len(deposits)) - indices := make([]uint64, len(deposits)) - for i := 0; i < len(deposits); i++ { - pubKeys[i] = deposits[i].Data.PublicKey - indices[i] = uint64(i) - } - - pubkeysAs48ByteType := make([][fieldparams.BLSPubkeyLength]byte, len(pubKeys)) - for i, pk := range pubKeys { - pubkeysAs48ByteType[i] = bytesutil.ToBytes48(pk) - } - - ctx, cancel := context.WithCancel(context.Background()) - c := &mockChain.ChainService{ - Genesis: time.Now(), - } - vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, - PayloadIDCache: cache.NewPayloadIDCache(), - } - - // Test the first validator in registry. - req := ðpb.DutiesRequest{ - PublicKeys: [][]byte{deposits[0].Data.PublicKey}, - } - wantedRes, err := vs.duties(ctx, req) - require.NoError(t, err) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - exitRoutine := make(chan bool) - mockStream := mock.NewMockBeaconNodeValidator_StreamDutiesServer(ctrl) - mockStream.EXPECT().Send(wantedRes).Return(nil) - mockStream.EXPECT().Send(wantedRes).Do(func(arg0 interface{}) { - exitRoutine <- true - }) - mockStream.EXPECT().Context().Return(ctx).AnyTimes() - go func(tt *testing.T) { - assert.ErrorContains(t, "context canceled", vs.StreamDuties(req, mockStream)) - }(t) - // Fire a reorg event. This needs to trigger - // a recomputation and resending of duties over the stream. - for sent := 0; sent == 0; { - sent = vs.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.Reorg, - Data: ðpbv1.EventChainReorg{Depth: uint64(params.BeaconConfig().SlotsPerEpoch), Slot: 0}, - }) - } - <-exitRoutine - cancel() -} - func BenchmarkCommitteeAssignment(b *testing.B) { genesis := util.NewBeaconBlock()