Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup blockexchange blockAddress handling #1051

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
10 changes: 6 additions & 4 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type DiscoveryEngine* = ref object of RootObj

proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
for address in toSeq(b.pendingBlocks.wantList):
let cid = address.cidOrTreeCid
try:
await b.discoveryQueue.put(cid)
except CancelledError:
Expand Down Expand Up @@ -87,9 +88,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
trace "Discovery request already in progress", cid
continue

let haves = b.peers.peersHave(cid)

if haves.len < b.minPeersPerBlock:
if b.peers.countPeersWhoHave(cid) < b.minPeersPerBlock:
try:
let request = b.discovery.find(cid).wait(DefaultDiscoveryTimeout)

Expand Down Expand Up @@ -126,6 +125,9 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg

proc queueFindBlocksReq*(b: DiscoveryEngine, addresses: seq[BlockAddress]) {.inline.} =
b.queueFindBlocksReq(addresses.mapIt(it.cidOrTreeCid))

proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task
##
Expand Down
20 changes: 8 additions & 12 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@

# drop unresponsive peer
await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
b.discovery.queueFindBlocksReq(@[address])

Check warning on line 180 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L180

Added line #L180 was not covered by tests

proc pickPseudoRandom(
address: BlockAddress, peers: seq[BlockExcPeerCtx]
Expand All @@ -193,7 +193,7 @@
let peers = b.peers.getPeersForBlock(address)

if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
b.discovery.queueFindBlocksReq(@[address])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
Expand Down Expand Up @@ -242,25 +242,21 @@
# if none of the connected peers report our wants in their have list,
# fire up discovery
b.discovery.queueFindBlocksReq(
toSeq(b.pendingBlocks.wantListCids).filter do(cid: Cid) -> bool:
not b.peers.anyIt(cid in it.peerHaveCids)
toSeq(b.pendingBlocks.wantList).filterIt(b.peers.peersHave(it).len == 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trusting those two things are the same....

)

proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
let cids = blocksDelivery.mapIt(it.blk.cid)

let addresses = blocksDelivery.mapIt(it.address)
# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one cid
# and we have it in our local store
if c in p.peerWantsCids:
if await (c in b.localStore):
for address in addresses:
# schedule a peer if it wants at least one
if address in p.peerWants:
if await (address in b.localStore):
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
warn "Unable to schedule task for peer", peer = p.id

break # do next peer

proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
Expand Down
13 changes: 0 additions & 13 deletions codex/blockexchange/engine/pendingblocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,6 @@ iterator wantList*(p: PendingBlocksManager): BlockAddress =
for a in p.blocks.keys:
yield a

iterator wantListBlockCids*(p: PendingBlocksManager): Cid =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the actual dead code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, unused code.

for a in p.blocks.keys:
if not a.leaf:
yield a.cid

iterator wantListCids*(p: PendingBlocksManager): Cid =
var yieldedCids = initHashSet[Cid]()
for a in p.blocks.keys:
let cid = a.cidOrTreeCid
if cid notin yieldedCids:
yieldedCids.incl(cid)
yield cid

iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
for v in p.blocks.values:
yield v.handle
Expand Down
6 changes: 0 additions & 6 deletions codex/blockexchange/peers/peercontext.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ type BlockExcPeerCtx* = ref object of RootObj
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
toSeq(self.blocks.keys)

proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this 🙂

self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet

proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet

proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocks

Expand Down
7 changes: 2 additions & 5 deletions codex/blockexchange/peers/peerctxstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,12 @@ func len*(self: PeerCtxStore): int =
func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it == address))

func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it.cidOrTreeCid == cid))
func countPeersWhoHave*(self: PeerCtxStore, cid: Cid): int =
self.peers.values.countIt(it.peerHave.anyIt(it.cidOrTreeCid == cid))

func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it == address))

func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid))

proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res = PeersForBlock()
for peer in self:
Expand Down
4 changes: 2 additions & 2 deletions tests/codex/blockexchange/testpendingblocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ checksuite "Pending Blocks":
discard blks.mapIt(pendingBlocks.getWantHandle(it.cid))

check:
blks.mapIt($it.cid).sorted(cmp[string]) ==
toSeq(pendingBlocks.wantListBlockCids).mapIt($it).sorted(cmp[string])
blks.mapIt($BlockAddress.init(it.cid)).sorted(cmp[string]) ==
toSeq(pendingBlocks.wantList).mapIt($it).sorted(cmp[string])

test "Should get want handles list":
let
Expand Down
Loading