Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snap: track revertals when peer rejects request #22016

Merged
merged 3 commits into from
Jan 7, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 116 additions & 56 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
}
req.timeout = time.AfterFunc(requestTimeout, func() {
log.Debug("Account range request timed out")
select {
case s.accountReqFails <- req:
default:
}
s.scheduleRevertAccountRequest(req)
})
s.accountReqs[reqid] = req
delete(s.accountIdlers, idle)
Expand All @@ -807,12 +804,8 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
// Attempt to send the remote request and revert if it fails
if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil {
peer.Log().Debug("Failed to request account range", "err", err)
select {
case s.accountReqFails <- req:
default:
}
s.scheduleRevertAccountRequest(req)
}
// Request successfully sent, start a
}(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists

// Inject the request into the task to block further assignments
Expand Down Expand Up @@ -886,10 +879,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
}
req.timeout = time.AfterFunc(requestTimeout, func() {
log.Debug("Bytecode request timed out")
select {
case s.bytecodeReqFails <- req:
default:
}
s.scheduleRevertBytecodeRequest(req)
})
s.bytecodeReqs[reqid] = req
delete(s.bytecodeIdlers, idle)
Expand All @@ -901,12 +891,8 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
// Attempt to send the remote request and revert if it fails
if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
log.Debug("Failed to request bytecodes", "err", err)
select {
case s.bytecodeReqFails <- req:
default:
}
s.scheduleRevertBytecodeRequest(req)
}
// Request successfully sent, start a
}(s.peers[idle]) // We're in the lock, peers[id] surely exists
}
}
Expand Down Expand Up @@ -1018,10 +1004,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
}
req.timeout = time.AfterFunc(requestTimeout, func() {
log.Debug("Storage request timed out")
select {
case s.storageReqFails <- req:
default:
}
s.scheduleRevertStorageRequest(req)
})
s.storageReqs[reqid] = req
delete(s.storageIdlers, idle)
Expand All @@ -1037,12 +1020,8 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
}
if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil {
log.Debug("Failed to request storage", "err", err)
select {
case s.storageReqFails <- req:
default:
}
s.scheduleRevertStorageRequest(req)
}
// Request successfully sent, start a
}(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists

// Inject the request into the subtask to block further assignments
Expand Down Expand Up @@ -1140,10 +1119,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
}
req.timeout = time.AfterFunc(requestTimeout, func() {
log.Debug("Trienode heal request timed out")
select {
case s.trienodeHealReqFails <- req:
default:
}
s.scheduleRevertTrienodeHealRequest(req)
})
s.trienodeHealReqs[reqid] = req
delete(s.trienodeHealIdlers, idle)
Expand All @@ -1155,12 +1131,8 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
// Attempt to send the remote request and revert if it fails
if err := peer.RequestTrieNodes(reqid, root, pathsets, maxRequestSize); err != nil {
log.Debug("Failed to request trienode healers", "err", err)
select {
case s.trienodeHealReqFails <- req:
default:
}
s.scheduleRevertTrienodeHealRequest(req)
}
// Request successfully sent, start a
}(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
}
}
Expand Down Expand Up @@ -1245,10 +1217,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
}
req.timeout = time.AfterFunc(requestTimeout, func() {
log.Debug("Bytecode heal request timed out")
select {
case s.bytecodeHealReqFails <- req:
default:
}
s.scheduleRevertBytecodeHealRequest(req)
})
s.bytecodeHealReqs[reqid] = req
delete(s.bytecodeHealIdlers, idle)
Expand All @@ -1260,12 +1229,8 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
// Attempt to send the remote request and revert if it fails
if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
log.Debug("Failed to request bytecode healers", "err", err)
select {
case s.bytecodeHealReqFails <- req:
default:
}
s.scheduleRevertBytecodeHealRequest(req)
}
// Request successfully sent, start a
}(s.peers[idle]) // We're in the lock, peers[id] surely exists
}
}
Expand Down Expand Up @@ -1325,10 +1290,26 @@ func (s *Syncer) revertRequests(peer string) {
}
}

// scheduleRevertAccountRequest asks the event loop to clean up an account range
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) {
select {
case s.accountReqFails <- req:
// Sync event loop notified
case <-req.cancel:
// Sync cycle got cancelled
case <-req.stale:
// Request already reverted
}
}

// revertAccountRequest cleans up an account range request and returns all failed
// retrieval tasks to the scheduler for reassignment.
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertAccountRequest.
func (s *Syncer) revertAccountRequest(req *accountRequest) {
log.Trace("Reverting account request", "peer", req.peer, "reqid", req.id)
log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id)
select {
case <-req.stale:
log.Trace("Account request already reverted", "peer", req.peer, "reqid", req.id)
Expand All @@ -1350,10 +1331,26 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) {
}
}

// revertBytecodeRequest cleans up an bytecode request and returns all failed
// scheduleRevertBytecodeRequest asks the event loop to clean up a bytecode request
// and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) {
select {
case s.bytecodeReqFails <- req:
// Sync event loop notified
case <-req.cancel:
// Sync cycle got cancelled
case <-req.stale:
// Request already reverted
}
}

// revertBytecodeRequest cleans up a bytecode request and returns all failed
// retrieval tasks to the scheduler for reassignment.
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertBytecodeRequest.
func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
log.Trace("Reverting bytecode request", "peer", req.peer)
log.Debug("Reverting bytecode request", "peer", req.peer)
select {
case <-req.stale:
log.Trace("Bytecode request already reverted", "peer", req.peer, "reqid", req.id)
Expand All @@ -1375,10 +1372,26 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
}
}

// scheduleRevertStorageRequest asks the event loop to clean up a storage range
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) {
select {
case s.storageReqFails <- req:
// Sync event loop notified
case <-req.cancel:
// Sync cycle got cancelled
case <-req.stale:
// Request already reverted
}
}

// revertStorageRequest cleans up a storage range request and returns all failed
// retrieval tasks to the scheduler for reassignment.
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertStorageRequest.
func (s *Syncer) revertStorageRequest(req *storageRequest) {
log.Trace("Reverting storage request", "peer", req.peer)
log.Debug("Reverting storage request", "peer", req.peer)
select {
case <-req.stale:
log.Trace("Storage request already reverted", "peer", req.peer, "reqid", req.id)
Expand All @@ -1404,10 +1417,26 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) {
}
}

// revertTrienodeHealRequest cleans up an trienode heal request and returns all
// scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
select {
case s.trienodeHealReqFails <- req:
// Sync event loop notified
case <-req.cancel:
// Sync cycle got cancelled
case <-req.stale:
// Request already reverted
}
}

// revertTrienodeHealRequest cleans up a trienode heal request and returns all
// failed retrieval tasks to the scheduler for reassignment.
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertTrienodeHealRequest.
func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
log.Trace("Reverting trienode heal request", "peer", req.peer)
log.Debug("Reverting trienode heal request", "peer", req.peer)
select {
case <-req.stale:
log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id)
Expand All @@ -1429,10 +1458,26 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
}
}

// revertBytecodeHealRequest cleans up an bytecode request and returns all failed
// retrieval tasks to the scheduler for reassignment.
// scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
select {
case s.bytecodeHealReqFails <- req:
// Sync event loop notified
case <-req.cancel:
// Sync cycle got cancelled
case <-req.stale:
// Request already reverted
}
}

// revertBytecodeHealRequest cleans up a bytecode heal request and returns all
// failed retrieval tasks to the scheduler for reassignment.
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertBytecodeHealRequest.
func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
log.Trace("Reverting bytecode heal request", "peer", req.peer)
log.Debug("Reverting bytecode heal request", "peer", req.peer)
select {
case <-req.stale:
log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id)
Expand Down Expand Up @@ -1768,7 +1813,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
if err := batch.Write(); err != nil {
log.Crit("Failed to persist healing data", "err", err)
}
log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize()))
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
}

// processBytecodeHealResponse integrates an already validated bytecode response
Expand Down Expand Up @@ -1804,7 +1849,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
if err := batch.Write(); err != nil {
log.Crit("Failed to persist healing data", "err", err)
}
log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize()))
log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize()))
}

// forwardAccountTask takes a filled account task and persists anything available
Expand Down Expand Up @@ -1940,6 +1985,9 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
logger.Debug("Peer rejected account range request", "root", s.root)
s.statelessPeers[peer.id] = struct{}{}
s.lock.Unlock()

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertAccountRequest(req)
return nil
}
root := s.root
Expand Down Expand Up @@ -2055,6 +2103,9 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
logger.Debug("Peer rejected bytecode request")
s.statelessPeers[peer.id] = struct{}{}
s.lock.Unlock()

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertBytecodeRequest(req)
return nil
}
s.lock.Unlock()
Expand Down Expand Up @@ -2166,6 +2217,9 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
logger.Debug("Peer rejected storage request")
s.statelessPeers[peer.id] = struct{}{}
s.lock.Unlock()

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertStorageRequest(req)
return nil
}
s.lock.Unlock()
Expand Down Expand Up @@ -2287,6 +2341,9 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
logger.Debug("Peer rejected trienode heal request")
s.statelessPeers[peer.id] = struct{}{}
s.lock.Unlock()

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req)
return nil
}
s.lock.Unlock()
Expand Down Expand Up @@ -2371,6 +2428,9 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro
logger.Debug("Peer rejected bytecode heal request")
s.statelessPeers[peer.id] = struct{}{}
s.lock.Unlock()

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertBytecodeHealRequest(req)
return nil
}
s.lock.Unlock()
Expand Down