Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add blocks to the blockstore before returning them from blockservice sessions. #4169

Merged
merged 1 commit into from
Aug 25, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlockFrom(blk, "")
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand All @@ -317,8 +325,11 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
// it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk)

for _, s := range bs.SessionsForBlock(blk.Cid()) {
s.receiveBlockFrom("", blk)
k := blk.Cid()
ks := []*cid.Cid{k}
for _, s := range bs.SessionsForBlock(k) {
s.receiveBlockFrom(from, blk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm... the change from "" to from here is on purpose? Whats the reasoning? ( i dont remember all the implications without my coffee)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged two functions. from will be "" when the user calls HasBlock but will be the peer from which we received the block when called from within bitswap.

Basically, we had two functions, HasBlock and a go routine inside ReceiveMessage where:

  1. The second function would:
  2. Notify all interested sessions that we had received the block.
  3. Remove the block from sessoins' wantlists.
  4. Call the first function which would:
  5. Put the block in the blockstore.
  6. Publish a notification on bs.notifications.
  7. Notify the session that we received the block (again) but claiming to have received it from "". This was to prevent a race condition where we'd notify the session that we received the block before putting it in the blockstore and then someone would try to get the block from the blockstore, not find it, and ask bitswap for it. I believe we should also have removed this block from the session but I'm not sure why we didn't (the new code does).
  8. Record the block in the bitswap engine.
  9. Reprovide the block.

I got rid of the first function, moved step 3 into step 7, and made it notify the session with the correct peer in step 7 if we received the block from the network.


The bug was that race condition in step 7. The fix works iff you keep the same bitswap instance but, if you replace the bitswap instance with a new one, the caller will not find the block in the blockstore and ask the new bitswap instance for the block (which doesn't know about it).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, that sounds good. Any way you could write a test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestFetchGraph? 😄 It's a bit hard to test a race condition...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this is fixing a test failure so there is, by definition, a test case).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay fineeee

bs.CancelWants(ks, s.id)
}

bs.engine.AddBlock(blk)
Expand Down Expand Up @@ -379,21 +390,12 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg

bs.updateReceiveCounters(b)

k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)

for _, ses := range bs.SessionsForBlock(k) {
ses.receiveBlockFrom(p, b)
bs.CancelWants([]*cid.Cid{k}, ses.id)
}

log.Debugf("got block %s from %s", b, p)
// TODO: rework this to not call 'HasBlock'. 'HasBlock' is really
// designed to be called when blocks are coming in from non-bitswap
// places (like the user manually adding data)
if err := bs.HasBlock(b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err)

if err := bs.receiveBlockFrom(b, p); err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
}
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}(block)
}
wg.Wait()
Expand Down