From a323609c31622e2c359649e9122b746b35cf3d09 Mon Sep 17 00:00:00 2001 From: anshalshukla Date: Thu, 12 Sep 2024 16:04:41 +0530 Subject: [PATCH] add: eth67 protocol --- cmd/devp2p/internal/ethtest/conn.go | 2 +- cmd/devp2p/internal/ethtest/suite.go | 10 +- cmd/devp2p/internal/ethtest/transaction.go | 4 +- eth/downloader/bor_downloader_test.go | 94 ++++- eth/downloader/skeleton_test.go | 417 +++++++++++---------- eth/handler_eth.go | 5 +- eth/handler_eth_test.go | 17 +- eth/protocols/eth/broadcast.go | 13 +- eth/protocols/eth/handler.go | 27 +- eth/protocols/eth/handler_test.go | 5 +- eth/protocols/eth/handlers.go | 26 +- eth/protocols/eth/handshake_test.go | 1 + eth/protocols/eth/peer.go | 28 +- eth/protocols/eth/protocol.go | 18 +- eth/sync_test.go | 1 + 15 files changed, 435 insertions(+), 233 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/conn.go b/cmd/devp2p/internal/ethtest/conn.go index 757b137aa1..b3d0c7afa6 100644 --- a/cmd/devp2p/internal/ethtest/conn.go +++ b/cmd/devp2p/internal/ethtest/conn.go @@ -167,7 +167,7 @@ func (c *Conn) ReadEth() (any, error) { case eth.TransactionsMsg: msg = new(eth.TransactionsPacket) case eth.NewPooledTransactionHashesMsg: - msg = new(eth.NewPooledTransactionHashesPacket) + msg = new(eth.NewPooledTransactionHashesPacket68) case eth.GetPooledTransactionsMsg: msg = new(eth.GetPooledTransactionsPacket) case eth.PooledTransactionsMsg: diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index 1a8f6ab624..d51bf1e7be 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -734,7 +734,7 @@ the transactions using a GetPooledTransactions request.`) } // Send announcement. - ann := eth.NewPooledTransactionHashesPacket{Types: txTypes, Sizes: sizes, Hashes: hashes} + ann := eth.NewPooledTransactionHashesPacket68{Types: txTypes, Sizes: sizes, Hashes: hashes} err = conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann) if err != nil { t.Fatalf("failed to write to connection: %v", err) @@ -753,7 +753,7 @@ the transactions using a GetPooledTransactions request.`) } return - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket68: continue case *eth.TransactionsPacket: continue @@ -823,12 +823,12 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { t2 = s.makeBlobTxs(2, 3, 0x2) ) for _, test := range []struct { - ann eth.NewPooledTransactionHashesPacket + ann eth.NewPooledTransactionHashesPacket68 resp eth.PooledTransactionsResponse }{ // Invalid tx size. { - ann: eth.NewPooledTransactionHashesPacket{ + ann: eth.NewPooledTransactionHashesPacket68{ Types: []byte{types.BlobTxType, types.BlobTxType}, Sizes: []uint32{uint32(t1[0].Size()), uint32(t1[1].Size() + 10)}, Hashes: []common.Hash{t1[0].Hash(), t1[1].Hash()}, @@ -837,7 +837,7 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { }, // Wrong tx type. { - ann: eth.NewPooledTransactionHashesPacket{ + ann: eth.NewPooledTransactionHashesPacket68{ Types: []byte{types.DynamicFeeTxType, types.BlobTxType}, Sizes: []uint32{uint32(t2[0].Size()), uint32(t2[1].Size())}, Hashes: []common.Hash{t2[0].Hash(), t2[1].Hash()}, diff --git a/cmd/devp2p/internal/ethtest/transaction.go b/cmd/devp2p/internal/ethtest/transaction.go index 4fff50864e..295b29d66a 100644 --- a/cmd/devp2p/internal/ethtest/transaction.go +++ b/cmd/devp2p/internal/ethtest/transaction.go @@ -72,7 +72,7 @@ func (s *Suite) sendTxs(t *utesting.T, txs []*types.Transaction) error { for _, tx := range *msg { got[tx.Hash()] = true } - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket68: for _, hash := range msg.Hashes { got[hash] = true } @@ -157,7 +157,7 @@ func (s *Suite) sendInvalidTxs(t *utesting.T, txs []*types.Transaction) error { return fmt.Errorf("received bad tx: %s", tx.Hash()) } } - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket68: for _, hash := range msg.Hashes { if _, ok := invalids[hash]; ok { return fmt.Errorf("received bad tx: %s", hash) diff --git a/eth/downloader/bor_downloader_test.go b/eth/downloader/bor_downloader_test.go index 32bde123f6..884e9e1dd6 100644 --- a/eth/downloader/bor_downloader_test.go +++ b/eth/downloader/bor_downloader_test.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" + "github.com/stretchr/testify/assert" ) @@ -480,6 +481,8 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) { func TestCanonicalSynchronisation68Full(t *testing.T) { testCanonSync(t, eth.ETH68, FullSync) } func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH68, SnapSync) } +func TestCanonicalSynchronisation67Full(t *testing.T) { testCanonSync(t, eth.ETH67, FullSync) } +func TestCanonicalSynchronisation67Snap(t *testing.T) { testCanonSync(t, eth.ETH67, SnapSync) } func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -501,6 +504,8 @@ func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { // until the cached blocks are retrieved. func TestThrottling68Full(t *testing.T) { testThrottling(t, eth.ETH68, FullSync) } func TestThrottling68Snap(t *testing.T) { testThrottling(t, eth.ETH68, SnapSync) } +func TestThrottling67Full(t *testing.T) { testThrottling(t, eth.ETH67, FullSync) } +func TestThrottling67Snap(t *testing.T) { testThrottling(t, eth.ETH67, SnapSync) } func testThrottling(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -586,6 +591,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { // binary search should be executed. func TestForkedSync68Full(t *testing.T) { testForkedSync(t, eth.ETH68, FullSync) } func TestForkedSync68Snap(t *testing.T) { testForkedSync(t, eth.ETH68, SnapSync) } +func TestForkedSync67Full(t *testing.T) { testForkedSync(t, eth.ETH67, FullSync) } +func TestForkedSync67Snap(t *testing.T) { testForkedSync(t, eth.ETH67, SnapSync) } func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -614,6 +621,8 @@ func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { // currently and is not dropped. func TestHeavyForkedSync68Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, FullSync) } func TestHeavyForkedSync68Snap(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, SnapSync) } +func TestHeavyForkedSync67Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, FullSync) } +func TestHeavyForkedSync67Snap(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, SnapSync) } func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -644,6 +653,8 @@ func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { // long dead chains. func TestBoundedForkedSync68Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, FullSync) } func TestBoundedForkedSync68Snap(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, SnapSync) } +func TestBoundedForkedSync67Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, FullSync) } +func TestBoundedForkedSync67Snap(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, SnapSync) } func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -678,6 +689,15 @@ func TestBoundedHeavyForkedSync68Snap(t *testing.T) { testBoundedHeavyForkedSync(t, eth.ETH68, SnapSync) } +func TestBoundedHeavyForkedSync67Full(t *testing.T) { + t.Parallel() + testBoundedHeavyForkedSync(t, eth.ETH67, FullSync) +} +func TestBoundedHeavyForkedSync67Snap(t *testing.T) { + t.Parallel() + testBoundedHeavyForkedSync(t, eth.ETH67, SnapSync) +} + func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) defer tester.terminate() @@ -705,6 +725,8 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that a canceled download wipes all previously accumulated state. func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) } func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) } +func TestCancel67Full(t *testing.T) { testCancel(t, eth.ETH67, FullSync) } +func TestCancel67Snap(t *testing.T) { testCancel(t, eth.ETH67, SnapSync) } func testCancel(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -734,6 +756,8 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) { // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). func TestMultiSynchronisation68Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, FullSync) } func TestMultiSynchronisation68Snap(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, SnapSync) } +func TestMultiSynchronisation67Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, FullSync) } +func TestMultiSynchronisation67Snap(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, SnapSync) } func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -759,6 +783,8 @@ func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { // and not wreak havoc on other nodes in the network. func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) } func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH68, SnapSync) } +func TestMultiProtoSynchronisation67Full(t *testing.T) { testMultiProtoSync(t, eth.ETH67, FullSync) } +func TestMultiProtoSynchronisation67Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH67, SnapSync) } func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -769,6 +795,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { // Create peers of every type tester.newPeer("peer 68", eth.ETH68, chain.blocks[1:]) + tester.newPeer("peer 67", eth.ETH67, chain.blocks[1:]) // Synchronise with the requested peer and make sure all blocks were retrieved if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { @@ -778,7 +805,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { assertOwnChain(t, tester, len(chain.blocks)) // Check that no peers have been dropped off - for _, version := range []int{68} { + for _, version := range []int{68, 67} { peer := fmt.Sprintf("peer %d", version) if _, ok := tester.peers[peer]; !ok { t.Errorf("%s dropped", peer) @@ -790,6 +817,8 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { // made, and instead the header should be assembled into a whole block in itself. func TestEmptyShortCircuit68Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, FullSync) } func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, SnapSync) } +func TestEmptyShortCircuit67Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, FullSync) } +func TestEmptyShortCircuit67Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, SnapSync) } func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -819,7 +848,7 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { bodiesNeeded, receiptsNeeded := 0, 0 for _, block := range chain.blocks[1:] { - if (mode == SnapSync || mode == FullSync) && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { + if len(block.Transactions()) > 0 || len(block.Uncles()) > 0 { bodiesNeeded++ } } @@ -843,6 +872,8 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { // stalling the downloader by feeding gapped header chains. func TestMissingHeaderAttack68Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, FullSync) } func TestMissingHeaderAttack68Snap(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, SnapSync) } +func TestMissingHeaderAttack67Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, FullSync) } +func TestMissingHeaderAttack67Snap(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, SnapSync) } func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -870,6 +901,8 @@ func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { // detects the invalid numbering. func TestShiftedHeaderAttack68Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, FullSync) } func TestShiftedHeaderAttack68Snap(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, SnapSync) } +func TestShiftedHeaderAttack67Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, FullSync) } +func TestShiftedHeaderAttack67Snap(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, SnapSync) } func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -903,6 +936,15 @@ func TestHighTDStarvationAttack68Snap(t *testing.T) { testHighTDStarvationAttack(t, eth.ETH68, SnapSync) } +func TestHighTDStarvationAttack67Full(t *testing.T) { + t.Parallel() + testHighTDStarvationAttack(t, eth.ETH67, FullSync) +} +func TestHighTDStarvationAttack67Snap(t *testing.T) { + t.Parallel() + testHighTDStarvationAttack(t, eth.ETH67, SnapSync) +} + func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) defer tester.terminate() @@ -917,6 +959,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { // Tests that misbehaving peers are disconnected, whilst behaving ones are not. func TestBlockHeaderAttackerDropping68(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH68) } +func TestBlockHeaderAttackerDropping67(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH67) } func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // Define the disconnection requirement for individual hash fetch errors @@ -969,6 +1012,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // and highest block number) is tracked and updated correctly. func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) } func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) } +func TestSyncProgress67Full(t *testing.T) { testSyncProgress(t, eth.ETH67, FullSync) } +func TestSyncProgress67Snap(t *testing.T) { testSyncProgress(t, eth.ETH67, SnapSync) } func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1052,6 +1097,8 @@ func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.Sync // revertal). func TestForkedSyncProgress68Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, FullSync) } func TestForkedSyncProgress68Snap(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, SnapSync) } +func TestForkedSyncProgress67Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, FullSync) } +func TestForkedSyncProgress67Snap(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, SnapSync) } func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1130,6 +1177,8 @@ func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // continuation of the previous sync and not a new instance. func TestFailedSyncProgress68Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, FullSync) } func TestFailedSyncProgress68Snap(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, SnapSync) } +func TestFailedSyncProgress67Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, FullSync) } +func TestFailedSyncProgress67Snap(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, SnapSync) } func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1203,6 +1252,8 @@ func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // the progress height is successfully reduced at the next sync invocation. func TestFakedSyncProgress68Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, FullSync) } func TestFakedSyncProgress68Snap(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, SnapSync) } +func TestFakedSyncProgress67Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, FullSync) } +func TestFakedSyncProgress67Snap(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, SnapSync) } func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1367,6 +1418,8 @@ func TestRemoteHeaderRequestSpan(t *testing.T) { // being fast-synced from, avoiding potential cheap eclipse attacks. func TestBeaconSync68Full(t *testing.T) { testBeaconSync(t, eth.ETH68, FullSync) } func TestBeaconSync68Snap(t *testing.T) { testBeaconSync(t, eth.ETH68, SnapSync) } +func TestBeaconSync67Full(t *testing.T) { testBeaconSync(t, eth.ETH67, FullSync) } +func TestBeaconSync67Snap(t *testing.T) { testBeaconSync(t, eth.ETH67, SnapSync) } func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { t.Helper() @@ -1477,12 +1530,37 @@ func (w *whitelistFake) GetMilestoneIDsList() []string { return nil } +// TestFakedSyncProgress67WhitelistMismatch tests if in case of whitelisted +// checkpoint mismatch with opposite peer, the sync should fail. +func TestFakedSyncProgress67WhitelistMismatch(t *testing.T) { + t.Parallel() + + protocol := uint(eth.ETH67) + mode := FullSync + + tester := newTester(t) + validate := func(count int) (bool, error) { + return false, whitelist.ErrMismatch + } + tester.downloader.ChainValidator = newWhitelistFake(validate) + + defer tester.terminate() + + chainA := testChainForkLightA.blocks + tester.newPeer("light", protocol, chainA[1:]) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("light", nil, mode); err == nil { + t.Fatal("succeeded attacker synchronisation") + } +} + // TestFakedSyncProgress67WhitelistMatch tests if in case of whitelisted // checkpoint match with opposite peer, the sync should succeed. -func TestFakedSyncProgress68WhitelistMatch(t *testing.T) { +func TestFakedSyncProgress67WhitelistMatch(t *testing.T) { t.Parallel() - protocol := uint(eth.ETH68) + protocol := uint(eth.ETH67) mode := FullSync tester := newTester(t) @@ -1505,10 +1583,10 @@ func TestFakedSyncProgress68WhitelistMatch(t *testing.T) { // TestFakedSyncProgress67NoRemoteCheckpoint tests if in case of missing/invalid // checkpointed blocks with opposite peer, the sync should fail initially but // with the retry mechanism, it should succeed eventually. -func TestFakedSyncProgress68NoRemoteCheckpoint(t *testing.T) { +func TestFakedSyncProgress67NoRemoteCheckpoint(t *testing.T) { t.Parallel() - protocol := uint(eth.ETH68) + protocol := uint(eth.ETH67) mode := FullSync tester := newTester(t) @@ -1544,8 +1622,8 @@ func TestFakedSyncProgress68NoRemoteCheckpoint(t *testing.T) { // TestFakedSyncProgress67BypassWhitelistValidation tests if peer validation // via whitelist is bypassed when remote peer is far away or not -func TestFakedSyncProgress68BypassWhitelistValidation(t *testing.T) { - protocol := uint(eth.ETH68) +func TestFakedSyncProgress67BypassWhitelistValidation(t *testing.T) { + protocol := uint(eth.ETH67) mode := FullSync tester := newTester(t) diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index b5236f9d94..1054da209f 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" ) @@ -96,7 +95,7 @@ func newSkeletonTestPeer(id string, headers []*types.Header) *skeletonTestPeer { } } -// newSkeletonTestPeerWithHook creates a new mock peer to test the skeleton sync with, +// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with, // and sets an optional serve hook that can return headers for delivery instead // of the predefined chain. Useful for emulating malicious behavior that would // otherwise require dedicated peer types. @@ -388,9 +387,23 @@ func TestSkeletonSyncInit(t *testing.T) { _ = skeleton.Terminate() // Ensure the correct resulting sync status - expect := skeletonExpect{state: tt.newstate} - if err := checkSkeletonProgress(db, false, nil, expect); err != nil { - t.Errorf("test %d: %v", i, err) + var progress skeletonProgress + + _ = json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if len(progress.Subchains) != len(tt.newstate) { + t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate)) + continue + } + + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.newstate[j].Head { + t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head) + } + + if progress.Subchains[j].Tail != tt.newstate[j].Tail { + t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail) + } } } } @@ -496,37 +509,31 @@ func TestSkeletonSyncExtend(t *testing.T) { skeleton.Terminate() // Ensure the correct resulting sync status - expect := skeletonExpect{state: tt.newstate} - if err := checkSkeletonProgress(db, false, nil, expect); err != nil { - t.Errorf("test %d: %v", i, err) - } - } -} + var progress skeletonProgress -type skeletonExpect struct { - state []*subchain // Expected sync state after the post-init event - serve uint64 // Expected number of header retrievals after initial cycle - drop uint64 // Expected number of peers dropped after initial cycle -} + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) -type skeletonTest struct { - fill bool // Whether to run a real backfiller in this test case - unpredictable bool // Whether to ignore drops/serves due to uncertain packet assignments + if len(progress.Subchains) != len(tt.newstate) { + t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate)) + continue + } - head *types.Header // New head header to announce to reorg to - peers []*skeletonTestPeer // Initial peer set to start the sync with - mid skeletonExpect + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.newstate[j].Head { + t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head) + } - newHead *types.Header // New header to anoint on top of the old one - newPeer *skeletonTestPeer // New peer to join the skeleton syncer - end skeletonExpect + if progress.Subchains[j].Tail != tt.newstate[j].Tail { + t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail) + } + } + } } // Tests that the skeleton sync correctly retrieves headers from one or more // peers without duplicates or other strange side effects. func TestSkeletonSyncRetrievals(t *testing.T) { - //log.SetDefault(log.NewLogger(log.NewGlogHandler(log.NewTerminalHandler(os.Stderr, false)))) - + //log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) // Since skeleton headers don't need to be meaningful, beyond a parent hash // progression, create a long fake chain to test with. chain := []*types.Header{{Number: big.NewInt(0)}} @@ -550,7 +557,23 @@ func TestSkeletonSyncRetrievals(t *testing.T) { Extra: []byte("B"), // force a different hash }) } - tests := []skeletonTest{ + + tests := []struct { + fill bool // Whether to run a real backfiller in this test case + unpredictable bool // Whether to ignore drops/serves due to uncertain packet assignments + + head *types.Header // New head header to announce to reorg to + peers []*skeletonTestPeer // Initial peer set to start the sync with + midstate []*subchain // Expected sync state after initial cycle + midserve uint64 // Expected number of header retrievals after initial cycle + middrop uint64 // Expected number of peers dropped after initial cycle + + newHead *types.Header // New header to anoint on top of the old one + newPeer *skeletonTestPeer // New peer to join the skeleton syncer + endstate []*subchain // Expected sync state after the post-init event + endserve uint64 // Expected number of header retrievals after the post-init event + enddrop uint64 // Expected number of peers dropped after the post-init event + }{ // Completely empty database with only the genesis set. The sync is expected // to create a single subchain with the requested head. No peers however, so // the sync should be stuck without any progression. @@ -558,16 +581,12 @@ func TestSkeletonSyncRetrievals(t *testing.T) { // When a new peer is added, it should detect the join and fill the headers // to the genesis block. { - head: chain[len(chain)-1], - mid: skeletonExpect{ - state: []*subchain{{Head: uint64(len(chain) - 1), Tail: uint64(len(chain) - 1)}}, - }, + head: chain[len(chain)-1], + midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: uint64(len(chain) - 1)}}, - newPeer: newSkeletonTestPeer("test-peer", chain), - end: skeletonExpect{ - state: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, - serve: uint64(len(chain) - 2), // len - head - genesis - }, + newPeer: newSkeletonTestPeer("test-peer", chain), + endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + endserve: uint64(len(chain) - 2), // len - head - genesis }, // Completely empty database with only the genesis set. The sync is expected // to create a single subchain with the requested head. With one valid peer, @@ -575,18 +594,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { // // Adding a second peer should not have any effect. { - head: chain[len(chain)-1], - peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-1", chain)}, - mid: skeletonExpect{ - state: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, - serve: uint64(len(chain) - 2), // len - head - genesis - }, - - newPeer: newSkeletonTestPeer("test-peer-2", chain), - end: skeletonExpect{ - state: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, - serve: uint64(len(chain) - 2), // len - head - genesis - }, + head: chain[len(chain)-1], + peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-1", chain)}, + midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + midserve: uint64(len(chain) - 2), // len - head - genesis + + newPeer: newSkeletonTestPeer("test-peer-2", chain), + endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + endserve: uint64(len(chain) - 2), // len - head - genesis }, // Completely empty database with only the genesis set. The sync is expected // to create a single subchain with the requested head. With many valid peers, @@ -600,16 +615,12 @@ func TestSkeletonSyncRetrievals(t *testing.T) { newSkeletonTestPeer("test-peer-2", chain), newSkeletonTestPeer("test-peer-3", chain), }, - mid: skeletonExpect{ - state: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, - serve: uint64(len(chain) - 2), // len - head - genesis - }, + midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + midserve: uint64(len(chain) - 2), // len - head - genesis - newPeer: newSkeletonTestPeer("test-peer-4", chain), - end: skeletonExpect{ - state: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, - serve: uint64(len(chain) - 2), // len - head - genesis - }, + newPeer: newSkeletonTestPeer("test-peer-4", chain), + endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + endserve: uint64(len(chain) - 2), // len - head - genesis }, // This test checks if a peer tries to withhold a header - *on* the sync // boundary - instead of sending the requested amount. The malicious short @@ -621,18 +632,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:99]...), nil), chain[100:]...)), }, - mid: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, - serve: requestHeaders + 101 - 3, // len - head - genesis - missing - drop: 1, // penalize shortened header deliveries - }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 3, // len - head - genesis - missing + middrop: 1, // penalize shortened header deliveries - newPeer: newSkeletonTestPeer("good-peer", chain), - end: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, - serve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis - drop: 1, // no new drops - }, + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops }, // This test checks if a peer tries to withhold a header - *off* the sync // boundary - instead of sending the requested amount. The malicious short @@ -644,18 +651,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:50]...), nil), chain[51:]...)), }, - mid: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, - serve: requestHeaders + 101 - 3, // len - head - genesis - missing - drop: 1, // penalize shortened header deliveries - }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 3, // len - head - genesis - missing + middrop: 1, // penalize shortened header deliveries - newPeer: newSkeletonTestPeer("good-peer", chain), - end: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, - serve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis - drop: 1, // no new drops - }, + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops }, // This test checks if a peer tries to duplicate a header - *on* the sync // boundary - instead of sending the correct sequence. The malicious duped @@ -667,18 +670,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:99]...), chain[98]), chain[100:]...)), }, - mid: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, - serve: requestHeaders + 101 - 2, // len - head - genesis - drop: 1, // penalize invalid header sequences - }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // penalize invalid header sequences - newPeer: newSkeletonTestPeer("good-peer", chain), - end: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, - serve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis - drop: 1, // no new drops - }, + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops }, // This test checks if a peer tries to duplicate a header - *off* the sync // boundary - instead of sending the correct sequence. The malicious duped @@ -690,18 +689,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { peers: []*skeletonTestPeer{ newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:50]...), chain[49]), chain[51:]...)), }, - mid: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, - serve: requestHeaders + 101 - 2, // len - head - genesis - drop: 1, // penalize invalid header sequences - }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // penalize invalid header sequences - newPeer: newSkeletonTestPeer("good-peer", chain), - end: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, - serve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis - drop: 1, // no new drops - }, + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops }, // This test checks if a peer tries to inject a different header - *on* // the sync boundary - instead of sending the correct sequence. The bad @@ -724,18 +719,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { ), ), }, - mid: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, - serve: requestHeaders + 101 - 2, // len - head - genesis - drop: 1, // different set of headers, drop // TODO(karalabe): maybe just diff sync? - }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // different set of headers, drop // TODO(karalabe): maybe just diff sync? - newPeer: newSkeletonTestPeer("good-peer", chain), - end: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, - serve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis - drop: 1, // no new drops - }, + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops }, // This test checks if a peer tries to inject a different header - *off* // the sync boundary - instead of sending the correct sequence. The bad @@ -758,18 +749,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { ), ), }, - mid: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, - serve: requestHeaders + 101 - 2, // len - head - genesis - drop: 1, // different set of headers, drop - }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // different set of headers, drop - newPeer: newSkeletonTestPeer("good-peer", chain), - end: skeletonExpect{ - state: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, - serve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis - drop: 1, // no new drops - }, + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops }, // This test reproduces a bug caught during review (kudos to @holiman) // where a subchain is merged with a previously interrupted one, causing @@ -799,16 +786,12 @@ func TestSkeletonSyncRetrievals(t *testing.T) { return nil // Fallback to default behavior, just delayed }), }, - mid: skeletonExpect{ - state: []*subchain{{Head: 2 * requestHeaders, Tail: 1}}, - serve: 2*requestHeaders - 1, // len - head - genesis - }, + midstate: []*subchain{{Head: 2 * requestHeaders, Tail: 1}}, + midserve: 2*requestHeaders - 1, // len - head - genesis - newHead: chain[2*requestHeaders+2], - end: skeletonExpect{ - state: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}}, - serve: 4 * requestHeaders, - }, + newHead: chain[2*requestHeaders+2], + endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}}, + endserve: 4 * requestHeaders, }, // This test reproduces a bug caught by (@rjl493456442) where a skeleton // header goes missing, causing the sync to get stuck and/or panic. @@ -830,17 +813,13 @@ func TestSkeletonSyncRetrievals(t *testing.T) { fill: true, unpredictable: true, // We have good and bad peer too, bad may be dropped, test too short for certainty - head: chain[len(chain)/2+1], // Sync up until the sidechain common ancestor + 2 - peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-oldchain", chain)}, - mid: skeletonExpect{ - state: []*subchain{{Head: uint64(len(chain)/2 + 1), Tail: 1}}, - }, + head: chain[len(chain)/2+1], // Sync up until the sidechain common ancestor + 2 + peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-oldchain", chain)}, + midstate: []*subchain{{Head: uint64(len(chain)/2 + 1), Tail: 1}}, - newHead: sidechain[len(sidechain)/2+3], // Sync up until the sidechain common ancestor + 4 - newPeer: newSkeletonTestPeer("test-peer-newchain", sidechain), - end: skeletonExpect{ - state: []*subchain{{Head: uint64(len(sidechain)/2 + 3), Tail: uint64(len(chain) / 2)}}, - }, + newHead: sidechain[len(sidechain)/2+3], // Sync up until the sidechain common ancestor + 4 + newPeer: newSkeletonTestPeer("test-peer-newchain", sidechain), + endstate: []*subchain{{Head: uint64(len(sidechain)/2 + 3), Tail: uint64(len(chain) / 2)}}, }, } for i, tt := range tests { @@ -853,7 +832,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { // Create a peer set to feed headers through peerset := newPeerSet() for _, peer := range tt.peers { - peerset.Register(newPeerConnection(peer.id, eth.ETH68, peer, log.New("id", peer.id))) + peerset.Register(newPeerConnection(peer.id, eth.ETH67, peer, log.New("id", peer.id))) } // Create a peer dropper to track malicious peers dropped := make(map[string]int) @@ -906,84 +885,138 @@ func TestSkeletonSyncRetrievals(t *testing.T) { skeleton := newSkeleton(db, peerset, drop, filler) _ = skeleton.Sync(tt.head, nil, true) + var progress skeletonProgress // Wait a bit (bleah) for the initial sync loop to go to idle. This might // be either a finish or a never-start hence why there's no event to hook. + check := func() error { + if len(progress.Subchains) != len(tt.midstate) { + return fmt.Errorf("test %d, mid state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.midstate)) + } + + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.midstate[j].Head { + return fmt.Errorf("test %d, mid state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.midstate[j].Head) + } + + if progress.Subchains[j].Tail != tt.midstate[j].Tail { + return fmt.Errorf("test %d, mid state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.midstate[j].Tail) + } + } + + return nil + } + waitStart := time.Now() for waitTime := 20 * time.Millisecond; time.Since(waitStart) < 2*time.Second; waitTime = waitTime * 2 { time.Sleep(waitTime) - if err := checkSkeletonProgress(db, tt.unpredictable, tt.peers, tt.mid); err == nil { + // Check the post-init end state if it matches the required results + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if err := check(); err == nil { break } } - if err := checkSkeletonProgress(db, tt.unpredictable, tt.peers, tt.mid); err != nil { - t.Errorf("test %d, mid: %v", i, err) + + if err := check(); err != nil { + t.Error(err) continue } - // Apply the post-init events if there's any - endpeers := tt.peers - if tt.newPeer != nil { - if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH68, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil { - t.Errorf("test %d: failed to register new peer: %v", i, err) + if !tt.unpredictable { + var served uint64 + for _, peer := range tt.peers { + served += peer.served.Load() + } + + if served != tt.midserve { + t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve) + } + + var drops uint64 + + for _, peer := range tt.peers { + drops += peer.dropped.Load() + } + + if drops != tt.middrop { + t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) } - time.Sleep(time.Millisecond * 50) // given time for peer registration - endpeers = append(tt.peers, tt.newPeer) } + // Apply the post-init events if there's any if tt.newHead != nil { - skeleton.Sync(tt.newHead, nil, true) + _ = skeleton.Sync(tt.newHead, nil, true) } + if tt.newPeer != nil { + if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH67, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil { + t.Errorf("test %d: failed to register new peer: %v", i, err) + } + } // Wait a bit (bleah) for the second sync loop to go to idle. This might // be either a finish or a never-start hence why there's no event to hook. + check = func() error { + if len(progress.Subchains) != len(tt.endstate) { + return fmt.Errorf("test %d, end state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.endstate)) + } + + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.endstate[j].Head { + return fmt.Errorf("test %d, end state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.endstate[j].Head) + } + + if progress.Subchains[j].Tail != tt.endstate[j].Tail { + return fmt.Errorf("test %d, end state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.endstate[j].Tail) + } + } + + return nil + } waitStart = time.Now() for waitTime := 20 * time.Millisecond; time.Since(waitStart) < 2*time.Second; waitTime = waitTime * 2 { time.Sleep(waitTime) - if err := checkSkeletonProgress(db, tt.unpredictable, endpeers, tt.end); err == nil { + // Check the post-init end state if it matches the required results + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if err := check(); err == nil { break } } - if err := checkSkeletonProgress(db, tt.unpredictable, endpeers, tt.end); err != nil { - t.Errorf("test %d, end: %v", i, err) + + if err := check(); err != nil { + t.Error(err) continue } // Check that the peers served no more headers than we actually needed - // Clean up any leftover skeleton sync resources - skeleton.Terminate() - } -} + if !tt.unpredictable { + served := uint64(0) + for _, peer := range tt.peers { + served += peer.served.Load() + } -func checkSkeletonProgress(db ethdb.KeyValueReader, unpredictable bool, peers []*skeletonTestPeer, expected skeletonExpect) error { - var progress skeletonProgress - // Check the post-init end state if it matches the required results - json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + if tt.newPeer != nil { + served += tt.newPeer.served.Load() + } - if len(progress.Subchains) != len(expected.state) { - return fmt.Errorf("subchain count mismatch: have %d, want %d", len(progress.Subchains), len(expected.state)) - } - for j := 0; j < len(progress.Subchains); j++ { - if progress.Subchains[j].Head != expected.state[j].Head { - return fmt.Errorf("subchain %d head mismatch: have %d, want %d", j, progress.Subchains[j].Head, expected.state[j].Head) - } - if progress.Subchains[j].Tail != expected.state[j].Tail { - return fmt.Errorf("subchain %d tail mismatch: have %d, want %d", j, progress.Subchains[j].Tail, expected.state[j].Tail) - } - } - if !unpredictable { - var served uint64 - for _, peer := range peers { - served += peer.served.Load() - } - if served != expected.serve { - return fmt.Errorf("served headers mismatch: have %d, want %d", served, expected.serve) - } - var drops uint64 - for _, peer := range peers { - drops += peer.dropped.Load() - } - if drops != expected.drop { - return fmt.Errorf("dropped peers mismatch: have %d, want %d", drops, expected.drop) + if served != tt.endserve { + t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) + } + + drops := uint64(0) + + for _, peer := range tt.peers { + drops += peer.dropped.Load() + } + + if tt.newPeer != nil { + drops += tt.newPeer.dropped.Load() + } + + if drops != tt.enddrop { + t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) + } } + // Clean up any leftover skeleton sync resources + skeleton.Terminate() } - return nil } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 3869afee67..27a4d3876f 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -68,7 +68,10 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewBlockPacket: return h.handleBlockBroadcast(peer, packet.Block, packet.TD) - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket67: + return h.txFetcher.Notify(peer.ID(), nil, nil, *packet) + + case *eth.NewPooledTransactionHashesPacket68: return h.txFetcher.Notify(peer.ID(), packet.Types, packet.Sizes, packet.Hashes) case *eth.TransactionsPacket: diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 3866f66eac..a5affaa310 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -57,7 +57,11 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { h.blockBroadcasts.Send(packet.Block) return nil - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket67: + h.txAnnounces.Send(([]common.Hash)(*packet)) + return nil + + case *eth.NewPooledTransactionHashesPacket68: h.txAnnounces.Send(packet.Hashes) return nil @@ -76,6 +80,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // Tests that peers are correctly accepted (or rejected) based on the advertised // fork IDs in the protocol handshake. +func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) } func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) } func testForkIDSplit(t *testing.T, protocol uint) { @@ -233,6 +238,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { } // Tests that received transactions are added to the local pool. +func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) } func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) } func testRecvTransactions(t *testing.T, protocol uint) { @@ -292,6 +298,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { } // This test checks that pending transactions are sent. +func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) } func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) } func testSendTransactions(t *testing.T, protocol uint) { @@ -350,7 +357,7 @@ func testSendTransactions(t *testing.T, protocol uint) { seen := make(map[common.Hash]struct{}) for len(seen) < len(insert) { switch protocol { - case 68: + case 67, 68: select { case hashes := <-anns: for _, hash := range hashes { @@ -376,6 +383,7 @@ func testSendTransactions(t *testing.T, protocol uint) { // Tests that transactions get propagated to all attached peers, either via direct // broadcasts or via announcements/retrievals. +func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) } func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) } func testTransactionPropagation(t *testing.T, protocol uint) { @@ -483,8 +491,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { defer sourcePipe.Close() defer sinkPipe.Close() - sourcePeer := eth.NewPeer(eth.ETH68, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) - sinkPeer := eth.NewPeer(eth.ETH68, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) + sourcePeer := eth.NewPeer(eth.ETH67, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) + sinkPeer := eth.NewPeer(eth.ETH67, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) defer sourcePeer.Close() defer sinkPeer.Close() @@ -543,6 +551,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // Tests that a propagated malformed block (uncles or transactions don't match // with the hashes in the header) gets discarded and not broadcast forward. +func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) } func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) } func testBroadcastMalformedBlock(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index ec8a37830a..f3826497e0 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -171,9 +171,16 @@ func (p *Peer) announceTransactions() { if len(pending) > 0 { done = make(chan struct{}) go func() { - if err := p.sendPooledTransactionHashes(pending, pendingTypes, pendingSizes); err != nil { - fail <- err - return + if p.version >= ETH68 { + if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { + fail <- err + return + } + } else { + if err := p.sendPooledTransactionHashes66(pending); err != nil { + fail <- err + return + } } close(done) p.Log().Trace("Sent transaction announcements", "count", len(pending)) diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 2d69ecdc83..e24a24b2a1 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -93,6 +93,10 @@ type TxPool interface { func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol { protocols := make([]p2p.Protocol, 0, len(ProtocolVersions)) for _, version := range ProtocolVersions { + // Blob transactions require eth/68 announcements, disable everything else + if version <= ETH67 && backend.Chain().Config().CancunBlock != nil { + continue + } version := version // Closure protocols = append(protocols, p2p.Protocol{ @@ -162,11 +166,26 @@ type Decoder interface { Time() time.Time } +var eth67 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes67, + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetReceiptsMsg: handleGetReceipts, + ReceiptsMsg: handleReceipts, + GetPooledTransactionsMsg: handleGetPooledTransactions, + PooledTransactionsMsg: handlePooledTransactions, +} + var eth68 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, - NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68, GetBlockHeadersMsg: handleGetBlockHeaders, BlockHeadersMsg: handleBlockHeaders, GetBlockBodiesMsg: handleGetBlockBodies, @@ -190,8 +209,10 @@ func handleMessage(backend Backend, peer *Peer) error { } defer msg.Discard() - var handlers = eth68 - + var handlers = eth67 + if peer.Version() >= ETH68 { + handlers = eth68 + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index d3a0f978e5..8670ea6ba0 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -61,7 +61,7 @@ func newTestBackend(blocks int) *testBackend { return newTestBackendWithGenerator(blocks, false, nil) } -// newTestBackendWithGenerator creates a chain with a number of explicitly defined blocks and +// newTestBackend creates a chain with a number of explicitly defined blocks and // wraps it into a mock backend. // nolint:typecheck func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, *core.BlockGen)) *testBackend { @@ -151,6 +151,7 @@ func (b *testBackend) Handle(*Peer, Packet) error { } // Tests that block headers can be retrieved from a remote chain based on user queries. +func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) } func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) } func testGetBlockHeaders(t *testing.T, protocol uint) { @@ -338,6 +339,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { } // Tests that block contents can be retrieved from a remote chain based on their hashes. +func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) } func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) } func testGetBlockBodies(t *testing.T, protocol uint) { @@ -432,6 +434,7 @@ func testGetBlockBodies(t *testing.T, protocol uint) { } // Tests that the transaction receipts can be retrieved based on hashes. +func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) } func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) } func testGetBlockReceipts(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 8aa471984c..e8f3fb7b8c 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -207,7 +207,7 @@ func serviceContiguousBlockHeaderQuery(chain *core.BlockChain, query *GetBlockHe return headers } { // Last mode: deliver ancestors of H - for i := uint64(1); i < count; i++ { + for i := uint64(1); header != nil && i < count; i++ { header = chain.GetHeaderByHash(header.ParentHash) if header == nil { break @@ -425,13 +425,33 @@ func handleReceipts(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { +func handleNewPooledTransactionHashes67(backend Backend, msg Decoder, peer *Peer) error { // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them if !backend.AcceptTxs() { return nil } - ann := new(NewPooledTransactionHashesPacket) + ann := new(NewPooledTransactionHashesPacket67) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Schedule all the unknown hashes for retrieval + for _, hash := range *ann { + peer.markTransaction(hash) + } + + return backend.Handle(peer, ann) +} + +func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error { + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + + ann := new(NewPooledTransactionHashesPacket68) + if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } diff --git a/eth/protocols/eth/handshake_test.go b/eth/protocols/eth/handshake_test.go index fa415cb7cf..6089723e57 100644 --- a/eth/protocols/eth/handshake_test.go +++ b/eth/protocols/eth/handshake_test.go @@ -27,6 +27,7 @@ import ( ) // Tests that handshake failures are detected and reported correctly. +func TestHandshake67(t *testing.T) { testHandshake(t, ETH67) } func TestHandshake68(t *testing.T) { testHandshake(t, ETH68) } func testHandshake(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 95ac5ad811..21fd966d58 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -56,6 +56,14 @@ const ( maxQueuedBlockAnns = 4 ) +// max is a helper function which returns the larger of the two given integers. +func max(a, b int) int { + if a > b { + return a + } + return b +} + // Peer is a collection of relevant information we have about a `eth` peer. type Peer struct { id string // Unique ID for the peer, cached @@ -84,7 +92,7 @@ type Peer struct { lock sync.RWMutex // Mutex protecting the internal fields } -// NewPeer creates a wrapper for a network connection and negotiated protocol +// NewPeer create a wrapper for a network connection and negotiated protocol // version. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { peer := &Peer{ @@ -202,17 +210,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { } } -// sendPooledTransactionHashes sends transaction hashes (tagged with their type +// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes +// them in its transaction hash set for future reference. +// +// This method is a helper used by the async transaction announcer. Don't call it +// directly as the queueing (memory) and transmission (bandwidth) costs should +// not be managed directly. +func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + p.knownTxs.Add(hashes...) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket67(hashes)) +} + +// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type // and size) to the peer and includes them in its transaction hash set for future // reference. // // This method is a helper used by the async transaction announcer. Don't call it // directly as the queueing (memory) and transmission (bandwidth) costs should // not be managed directly. -func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash, types []byte, sizes []uint32) error { +func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) - return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket{Types: types, Sizes: sizes, Hashes: hashes}) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes}) } // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index c0534b4da5..4b69c635ad 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -30,6 +30,7 @@ import ( // Constants to match up protocol versions and messages const ( + ETH67 = 67 ETH68 = 68 ) @@ -39,11 +40,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH68} +var ProtocolVersions = []uint{ETH68, ETH67} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH68: 17} +var protocolLengths = map[uint]uint64{ETH68: 17, ETH67: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -290,8 +291,11 @@ type ReceiptsRLPPacket struct { ReceiptsRLPResponse } -// NewPooledTransactionHashesPacket represents a transaction announcement packet on eth/68 and newer. -type NewPooledTransactionHashesPacket struct { +// NewPooledTransactionHashesPacket67 represents a transaction announcement packet on eth/67. +type NewPooledTransactionHashesPacket67 []common.Hash + +// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer. +type NewPooledTransactionHashesPacket68 struct { Types []byte Sizes []uint32 Hashes []common.Hash @@ -350,8 +354,10 @@ func (*BlockBodiesResponse) Kind() byte { return BlockBodiesMsg } func (*NewBlockPacket) Name() string { return "NewBlock" } func (*NewBlockPacket) Kind() byte { return NewBlockMsg } -func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" } -func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket67) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket67) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } func (*GetPooledTransactionsRequest) Name() string { return "GetPooledTransactions" } func (*GetPooledTransactionsRequest) Kind() byte { return GetPooledTransactionsMsg } diff --git a/eth/sync_test.go b/eth/sync_test.go index e3def8029d..a476f6234c 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -28,6 +28,7 @@ import ( ) // Tests that snap sync is disabled after a successful sync cycle. +func TestSnapSyncDisabling67(t *testing.T) { testSnapSyncDisabling(t, eth.ETH67, snap.SNAP1) } func TestSnapSyncDisabling68(t *testing.T) { testSnapSyncDisabling(t, eth.ETH68, snap.SNAP1) } // Tests that snap sync gets disabled as soon as a real block is successfully