Skip to content

Commit

Permalink
fix(dot/sync): fix block request and response logic (#1907)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored Oct 29, 2021
1 parent 8c8f6d0 commit 9c6283e
Show file tree
Hide file tree
Showing 11 changed files with 791 additions and 76 deletions.
2 changes: 1 addition & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc

err := s.host.writeToStream(hsData.stream, msg)
if err != nil {
logger.Trace("failed to send message to peer", "peer", peer, "error", err)
logger.Debug("failed to send message to peer", "peer", peer, "error", err)
}
}

Expand Down
10 changes: 10 additions & 0 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ func (bs *BlockState) AddBlockToBlockTree(header *types.Header) error {
return bs.bt.AddBlock(header, arrivalTime)
}

// GetAllBlocksAtNumber returns all unfinalised blocks with the given number
func (bs *BlockState) GetAllBlocksAtNumber(num *big.Int) ([]common.Hash, error) {
header, err := bs.GetHeaderByNumber(num)
if err != nil {
return nil, err
}

return bs.GetAllBlocksAtDepth(header.ParentHash), nil
}

// GetAllBlocksAtDepth returns all hashes with the depth of the given hash plus one
func (bs *BlockState) GetAllBlocksAtDepth(hash common.Hash) []common.Hash {
return bs.bt.GetAllBlocksAtNumber(hash)
Expand Down
30 changes: 25 additions & 5 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,7 @@ func (cs *chainSync) doSync(req *network.BlockRequestMessage) *workerError {

if req.Direction == network.Descending {
// reverse blocks before pre-validating and placing in ready queue
for i, j := 0, len(resp.BlockData)-1; i < j; i, j = i+1, j-1 {
resp.BlockData[i], resp.BlockData[j] = resp.BlockData[j], resp.BlockData[i]
}
reverseBlockData(resp.BlockData)
}

// perform some pre-validation of response, error if failure
Expand Down Expand Up @@ -897,10 +895,18 @@ func workerToRequests(w *worker) ([]*network.BlockRequestMessage, error) {
} else {
// in tip-syncing mode, we know the hash of the block on the fork we wish to sync
start, _ = variadic.NewUint64OrHash(w.startHash)

// if we're doing descending requests and not at the last (highest starting) request,
// then use number as start block
if w.direction == network.Descending && i != numRequests-1 {
start = variadic.MustNewUint64OrHash(startNumber)
}
}

var end *common.Hash
if !w.targetHash.IsEmpty() {
if !w.targetHash.IsEmpty() && i == numRequests-1 {
// if we're on our last request (which should contain the target hash),
// then add it
end = &w.targetHash
}

Expand All @@ -911,7 +917,21 @@ func workerToRequests(w *worker) ([]*network.BlockRequestMessage, error) {
Direction: w.direction,
Max: &max,
}
startNumber += maxResponseSize

switch w.direction {
case network.Ascending:
startNumber += maxResponseSize
case network.Descending:
startNumber -= maxResponseSize
}
}

// if our direction is descending, we want to send out the request with the lowest
// startNumber first
if w.direction == network.Descending {
for i, j := 0, len(reqs)-1; i < j; i, j = i+1, j-1 {
reqs[i], reqs[j] = reqs[j], reqs[i]
}
}

return reqs, nil
Expand Down
24 changes: 24 additions & 0 deletions dot/sync/chain_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,30 @@ func TestWorkerToRequests(t *testing.T) {
},
},
},
{
w: &worker{
startNumber: big.NewInt(1 + maxResponseSize + (maxResponseSize / 2)),
targetNumber: big.NewInt(1),
direction: network.Descending,
requestData: bootstrapRequestData,
},
expected: []*network.BlockRequestMessage{
{
RequestedData: network.RequestedDataHeader + network.RequestedDataBody + network.RequestedDataJustification,
StartingBlock: *variadic.MustNewUint64OrHash(1 + (maxResponseSize / 2)),
EndBlockHash: nil,
Direction: network.Descending,
Max: &max64,
},
{
RequestedData: bootstrapRequestData,
StartingBlock: *variadic.MustNewUint64OrHash(1 + maxResponseSize + (maxResponseSize / 2)),
EndBlockHash: nil,
Direction: network.Descending,
Max: &max128,
},
},
},
}

for i, tc := range testCases {
Expand Down
8 changes: 7 additions & 1 deletion dot/sync/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ var (
ErrInvalidBlock = errors.New("could not verify block")

// ErrInvalidBlockRequest is returned when an invalid block request is received
ErrInvalidBlockRequest = errors.New("invalid block request")
ErrInvalidBlockRequest = errors.New("invalid block request")
errInvalidRequestDirection = errors.New("invalid request direction")
errRequestStartTooHigh = errors.New("request start number is higher than our best block")
errFailedToGetEndHashAncestor = errors.New("failed to get ancestor of end block")

// chainSync errors
errEmptyBlockData = errors.New("empty block data")
Expand All @@ -57,6 +60,9 @@ var (
errUnknownParent = errors.New("parent of first block in block response is unknown")
errUnknownBlockForJustification = errors.New("received justification for unknown block")
errFailedToGetParent = errors.New("failed to get parent header")
errNilDescendantNumber = errors.New("descendant number is nil")
errStartAndEndMismatch = errors.New("request start and end hash are not on the same chain")
errFailedToGetDescendant = errors.New("failed to find descendant block")
)

// ErrNilChannel is returned if a channel is nil
Expand Down
3 changes: 3 additions & 0 deletions dot/sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type BlockState interface {
StoreRuntime(common.Hash, runtime.Instance)
GetHighestFinalisedHeader() (*types.Header, error)
GetFinalisedNotifierChannel() chan *types.FinalisationInfo
GetHeaderByNumber(num *big.Int) (*types.Header, error)
GetAllBlocksAtNumber(num *big.Int) ([]common.Hash, error)
IsDescendantOf(parent, child common.Hash) (bool, error)
}

// StorageState is the interface for the storage state
Expand Down
Loading

0 comments on commit 9c6283e

Please sign in to comment.