From d1c5d0962a32d683bb11cc98ca67a98fc27c80c8 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 4 Dec 2020 10:55:23 +0100 Subject: [PATCH 1/3] eth/protocols/snap: reschedule missed deliveries --- eth/protocols/snap/sync.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 437b0caab408..7476d50d7293 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -1328,7 +1328,7 @@ func (s *Syncer) revertRequests(peer string) { // revertAccountRequest cleans up an account range request and returns all failed // retrieval tasks to the scheduler for reassignment. func (s *Syncer) revertAccountRequest(req *accountRequest) { - log.Trace("Reverting account request", "peer", req.peer, "reqid", req.id) + log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id) select { case <-req.stale: log.Trace("Account request already reverted", "peer", req.peer, "reqid", req.id) @@ -1353,7 +1353,7 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) { // revertBytecodeRequest cleans up an bytecode request and returns all failed // retrieval tasks to the scheduler for reassignment. func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) { - log.Trace("Reverting bytecode request", "peer", req.peer) + log.Debug("Reverting bytecode request", "peer", req.peer) select { case <-req.stale: log.Trace("Bytecode request already reverted", "peer", req.peer, "reqid", req.id) @@ -1378,7 +1378,7 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) { // revertStorageRequest cleans up a storage range request and returns all failed // retrieval tasks to the scheduler for reassignment. func (s *Syncer) revertStorageRequest(req *storageRequest) { - log.Trace("Reverting storage request", "peer", req.peer) + log.Debug("Reverting storage request", "peer", req.peer) select { case <-req.stale: log.Trace("Storage request already reverted", "peer", req.peer, "reqid", req.id) @@ -1407,7 +1407,7 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) { // revertTrienodeHealRequest cleans up an trienode heal request and returns all // failed retrieval tasks to the scheduler for reassignment. func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { - log.Trace("Reverting trienode heal request", "peer", req.peer) + log.Debug("Reverting trienode heal request", "peer", req.peer) select { case <-req.stale: log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id) @@ -1432,7 +1432,7 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { // revertBytecodeHealRequest cleans up an bytecode request and returns all failed // retrieval tasks to the scheduler for reassignment. func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) { - log.Trace("Reverting bytecode heal request", "peer", req.peer) + log.Debug("Reverting bytecode heal request", "peer", req.peer) select { case <-req.stale: log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id) @@ -1940,6 +1940,8 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account logger.Debug("Peer rejected account range request", "root", s.root) s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling + s.revertAccountRequest(req) return nil } root := s.root @@ -2055,6 +2057,8 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error { logger.Debug("Peer rejected bytecode request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling + s.revertBytecodeRequest(req) return nil } s.lock.Unlock() @@ -2166,6 +2170,8 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots logger.Debug("Peer rejected storage request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling + s.revertStorageRequest(req) return nil } s.lock.Unlock() @@ -2287,6 +2293,8 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error { logger.Debug("Peer rejected trienode heal request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling + s.revertTrienodeHealRequest(req) return nil } s.lock.Unlock() @@ -2371,6 +2379,8 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro logger.Debug("Peer rejected bytecode heal request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling + s.revertBytecodeHealRequest(req) return nil } s.lock.Unlock() From 8e60341760bb3457b782b5875455de120a2a38a9 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 4 Dec 2020 11:43:25 +0100 Subject: [PATCH 2/3] eth/protocols/snap: clarify log message --- eth/protocols/snap/sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 7476d50d7293..5d5a307d42bc 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -1768,7 +1768,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { if err := batch.Write(); err != nil { log.Crit("Failed to persist healing data", "err", err) } - log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize())) + log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) } // processBytecodeHealResponse integrates an already validated bytecode response @@ -1804,7 +1804,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { if err := batch.Write(); err != nil { log.Crit("Failed to persist healing data", "err", err) } - log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize())) + log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize())) } // forwardAccountTask takes a filled account task and persists anything available From 3950d2101cfce27255f787afe66a1dbf2e55e2c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 6 Jan 2021 09:11:36 +0200 Subject: [PATCH 3/3] eth/protocols/snap: revert failures async and update runloop --- eth/protocols/snap/sync.go | 158 ++++++++++++++++++++++++------------- 1 file changed, 104 insertions(+), 54 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 5d5a307d42bc..82b21c470133 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -792,10 +792,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Account range request timed out") - select { - case s.accountReqFails <- req: - default: - } + s.scheduleRevertAccountRequest(req) }) s.accountReqs[reqid] = req delete(s.accountIdlers, idle) @@ -807,12 +804,8 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil { peer.Log().Debug("Failed to request account range", "err", err) - select { - case s.accountReqFails <- req: - default: - } + s.scheduleRevertAccountRequest(req) } - // Request successfully sent, start a }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists // Inject the request into the task to block further assignments @@ -886,10 +879,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Bytecode request timed out") - select { - case s.bytecodeReqFails <- req: - default: - } + s.scheduleRevertBytecodeRequest(req) }) s.bytecodeReqs[reqid] = req delete(s.bytecodeIdlers, idle) @@ -901,12 +891,8 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil { log.Debug("Failed to request bytecodes", "err", err) - select { - case s.bytecodeReqFails <- req: - default: - } + s.scheduleRevertBytecodeRequest(req) } - // Request successfully sent, start a }(s.peers[idle]) // We're in the lock, peers[id] surely exists } } @@ -1018,10 +1004,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Storage request timed out") - select { - case s.storageReqFails <- req: - default: - } + s.scheduleRevertStorageRequest(req) }) s.storageReqs[reqid] = req delete(s.storageIdlers, idle) @@ -1037,12 +1020,8 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) { } if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil { log.Debug("Failed to request storage", "err", err) - select { - case s.storageReqFails <- req: - default: - } + s.scheduleRevertStorageRequest(req) } - // Request successfully sent, start a }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists // Inject the request into the subtask to block further assignments @@ -1140,10 +1119,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Trienode heal request timed out") - select { - case s.trienodeHealReqFails <- req: - default: - } + s.scheduleRevertTrienodeHealRequest(req) }) s.trienodeHealReqs[reqid] = req delete(s.trienodeHealIdlers, idle) @@ -1155,12 +1131,8 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestTrieNodes(reqid, root, pathsets, maxRequestSize); err != nil { log.Debug("Failed to request trienode healers", "err", err) - select { - case s.trienodeHealReqFails <- req: - default: - } + s.scheduleRevertTrienodeHealRequest(req) } - // Request successfully sent, start a }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists } } @@ -1245,10 +1217,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Bytecode heal request timed out") - select { - case s.bytecodeHealReqFails <- req: - default: - } + s.scheduleRevertBytecodeHealRequest(req) }) s.bytecodeHealReqs[reqid] = req delete(s.bytecodeHealIdlers, idle) @@ -1260,12 +1229,8 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil { log.Debug("Failed to request bytecode healers", "err", err) - select { - case s.bytecodeHealReqFails <- req: - default: - } + s.scheduleRevertBytecodeHealRequest(req) } - // Request successfully sent, start a }(s.peers[idle]) // We're in the lock, peers[id] surely exists } } @@ -1325,8 +1290,24 @@ func (s *Syncer) revertRequests(peer string) { } } +// scheduleRevertAccountRequest asks the event loop to clean up an account range +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) { + select { + case s.accountReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + // revertAccountRequest cleans up an account range request and returns all failed // retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertAccountRequest. func (s *Syncer) revertAccountRequest(req *accountRequest) { log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id) select { @@ -1350,8 +1331,24 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) { } } -// revertBytecodeRequest cleans up an bytecode request and returns all failed +// scheduleRevertBytecodeRequest asks the event loop to clean up a bytecode request +// and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) { + select { + case s.bytecodeReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertBytecodeRequest cleans up a bytecode request and returns all failed // retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertBytecodeRequest. func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) { log.Debug("Reverting bytecode request", "peer", req.peer) select { @@ -1375,8 +1372,24 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) { } } +// scheduleRevertStorageRequest asks the event loop to clean up a storage range +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) { + select { + case s.storageReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + // revertStorageRequest cleans up a storage range request and returns all failed // retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertStorageRequest. func (s *Syncer) revertStorageRequest(req *storageRequest) { log.Debug("Reverting storage request", "peer", req.peer) select { @@ -1404,8 +1417,24 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) { } } -// revertTrienodeHealRequest cleans up an trienode heal request and returns all +// scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) { + select { + case s.trienodeHealReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertTrienodeHealRequest cleans up a trienode heal request and returns all // failed retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertTrienodeHealRequest. func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { log.Debug("Reverting trienode heal request", "peer", req.peer) select { @@ -1429,8 +1458,24 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { } } -// revertBytecodeHealRequest cleans up an bytecode request and returns all failed -// retrieval tasks to the scheduler for reassignment. +// scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) { + select { + case s.bytecodeHealReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertBytecodeHealRequest cleans up a bytecode heal request and returns all +// failed retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertBytecodeHealRequest. func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) { log.Debug("Reverting bytecode heal request", "peer", req.peer) select { @@ -1940,8 +1985,9 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account logger.Debug("Peer rejected account range request", "root", s.root) s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling - s.revertAccountRequest(req) + s.scheduleRevertAccountRequest(req) return nil } root := s.root @@ -2057,8 +2103,9 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error { logger.Debug("Peer rejected bytecode request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling - s.revertBytecodeRequest(req) + s.scheduleRevertBytecodeRequest(req) return nil } s.lock.Unlock() @@ -2170,8 +2217,9 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots logger.Debug("Peer rejected storage request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling - s.revertStorageRequest(req) + s.scheduleRevertStorageRequest(req) return nil } s.lock.Unlock() @@ -2293,8 +2341,9 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error { logger.Debug("Peer rejected trienode heal request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling - s.revertTrienodeHealRequest(req) + s.scheduleRevertTrienodeHealRequest(req) return nil } s.lock.Unlock() @@ -2379,8 +2428,9 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro logger.Debug("Peer rejected bytecode heal request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + // Signal this request as failed, and ready for rescheduling - s.revertBytecodeHealRequest(req) + s.scheduleRevertBytecodeHealRequest(req) return nil } s.lock.Unlock()