Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap sessions #3867

Merged
merged 11 commits into from
Jul 7, 2017
4 changes: 2 additions & 2 deletions blocks/blocksutil/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (bg *BlockGenerator) Next() *blocks.BasicBlock {
}

// Blocks generates as many BasicBlocks as specified by n.
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
blocks := make([]*blocks.BasicBlock, 0)
func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
blocks := make([]blocks.Block, 0, n)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)
Expand Down
57 changes: 51 additions & 6 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)

Expand Down Expand Up @@ -77,6 +78,21 @@ func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}

func NewSession(ctx context.Context, bs BlockService) *Session {
exchange := bs.Exchange()
if bswap, ok := exchange.(*bitswap.Bitswap); ok {
ses := bswap.NewSession(ctx)
return &Session{
ses: ses,
bs: bs.Blockstore(),
}
}
return &Session{
ses: exchange,
bs: bs.Blockstore(),
}
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
Expand Down Expand Up @@ -141,16 +157,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

block, err := s.blockstore.Get(c)
var f exchange.Fetcher
if s.exchange != nil {
f = s.exchange
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed, can't we just pass s.exchange?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, because then the response wouldnt be nil (it would be a typed nil, which in interface form is != nil)


return getBlock(ctx, c, s.blockstore, f)
}

func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
block, err := bs.Get(c)
if err == nil {
return block, nil
}

if err == blockstore.ErrNotFound && s.exchange != nil {
if err == blockstore.ErrNotFound && f != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.exchange.GetBlock(ctx, c)
blk, err := f.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -172,12 +197,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.blockstore, s.exchange)
}

func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.blockstore.Get(c)
hit, err := bs.Get(c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -194,7 +223,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}

rblocks, err := s.exchange.GetBlocks(ctx, misses)
rblocks, err := f.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
Expand All @@ -220,3 +249,19 @@ func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.exchange.Close()
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
bs blockstore.Blockstore
ses exchange.Fetcher
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
return getBlock(ctx, c, s.bs, s.ses)
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.bs, s.ses)
}
11 changes: 10 additions & 1 deletion core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{
ks = append(ks, c)
}

bs.CancelWants(ks)
// TODO: This should maybe find *all* sessions for this request and cancel them?
// (why): in reality, i think this command should be removed. Its
// messing with the internal state of bitswap. You should cancel wants
// by killing the command that caused the want.
bs.CancelWants(ks, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does canceling the context not cancel the associated wants?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, cancelling the context of any request command cancels the associated wants. So this command really should just get removed, it has no real purpose

},
}

Expand Down Expand Up @@ -107,6 +111,11 @@ Print out all blocks currently on the bitswap wantlist for the local peer.`,
res.SetError(err, cmds.ErrNormal)
return
}
if pid == nd.Identity {
res.SetOutput(&KeyList{bs.GetWantlist()})
return
}

res.SetOutput(&KeyList{bs.WantlistForPeer(pid)})
} else {
res.SetOutput(&KeyList{bs.GetWantlist()})
Expand Down
2 changes: 1 addition & 1 deletion core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
node "gx/ipfs/QmPAKbSsgEX5B6fpmxa61jXYnoWzZr5sNafd3qgPiSH8Uv/go-ipld-format"
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
multibase "gx/ipfs/QmcxkxTVuURV2Ptse8TvkqH5BQDwV62X1x19JqqvbBzwUM/go-multibase"
multibase "gx/ipfs/Qme4T6BE4sQxg7ZouamF5M7Tx1ZFTqzcns7BkyQPXpoT99/go-multibase"
)

const (
Expand Down
122 changes: 65 additions & 57 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"math"
"sync"
"sync/atomic"
"time"

blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
Expand All @@ -17,13 +18,12 @@ import (
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"

metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
Expand Down Expand Up @@ -99,6 +99,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
counters: new(counters),

dupMetric: dupHist,
allMetric: allHist,
Expand Down Expand Up @@ -152,17 +153,29 @@ type Bitswap struct {
process process.Process

// Counters for various statistics
counterLk sync.Mutex
blocksRecvd int
dupBlocksRecvd int
dupDataRecvd uint64
blocksSent int
dataSent uint64
dataRecvd uint64
counterLk sync.Mutex
counters *counters

// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram

// Sessions
sessions []*Session
sessLk sync.Mutex

sessID uint64
sessIDLk sync.Mutex
}

type counters struct {
blocksRecvd uint64
dupBlocksRecvd uint64
dupDataRecvd uint64
blocksSent uint64
dataSent uint64
dataRecvd uint64
messagesRecvd uint64
}

type blockRequest struct {
Expand All @@ -173,45 +186,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
return nil, blockstore.ErrNotFound
}

// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
// listen on parent in this scope, but NOT okay to pass |parent| to
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)

// TODO: this request ID should come in from a higher layer so we can track
// across multiple 'GetBlock' invocations
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
defer cancelFunc()

promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
return nil, err
}

select {
case block, ok := <-promise:
if !ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, errors.New("promise channel was closed")
}
}
return block, nil
case <-parent.Done():
return nil, parent.Err()
}
return getBlock(parent, k, bs.GetBlocks)
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
Expand Down Expand Up @@ -251,7 +226,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}

bs.wm.WantBlocks(ctx, keys)
mses := bs.getNextSessionID()

bs.wm.WantBlocks(ctx, keys, nil, mses)

// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
Expand All @@ -273,7 +250,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
defer close(out)
defer func() {
// can't just defer this call on its own, arguments are resolved *when* the defer is created
bs.CancelWants(remaining.Keys())
bs.CancelWants(remaining.Keys(), mses)
}()
for {
select {
Expand All @@ -282,6 +259,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
return
}

bs.CancelWants([]*cid.Cid{blk.Cid()}, mses)
remaining.Remove(blk.Cid())
select {
case out <- blk:
Expand All @@ -302,9 +280,19 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
}
}

func (bs *Bitswap) getNextSessionID() uint64 {
bs.sessIDLk.Lock()
defer bs.sessIDLk.Unlock()
bs.sessID++
return bs.sessID
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use an atomic.

However, is there any reason this uses session IDs instead of just pointers to Session structs? Are you worried about these IDs getting stuck somewhere preventing Sessions from being freed (I get an itch when I see ints being used as pointers)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing it this way because other bits of code need to be able to reference which sessions things belong to. Otherwise we might end up with circular dependencies since bitswap imports wantlists and wantlists reference sessions IDs.


// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
bs.wm.CancelWants(cids)
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
if len(cids) == 0 {
return
}
bs.wm.CancelWants(context.Background(), cids, nil, ses)
}

// HasBlock announces the existance of a block to this bitswap service. The
Expand Down Expand Up @@ -340,7 +328,23 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return nil
}

// SessionsForBlock returns a slice of all sessions that may be interested in the given cid
func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
bs.sessLk.Lock()
defer bs.sessLk.Unlock()

var out []*Session
for _, s := range bs.sessions {
if s.interestedIn(c) {
out = append(out, s)
}
}
return out
}

func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
atomic.AddUint64(&bs.counters.messagesRecvd, 1)

// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(p, incoming)
Expand All @@ -362,7 +366,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
keys = append(keys, block.Cid())
}
bs.wm.CancelWants(keys)

wg := sync.WaitGroup{}
for _, block := range iblocks {
Expand All @@ -375,6 +378,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)

for _, ses := range bs.SessionsForBlock(k) {
ses.receiveBlockFrom(p, b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm mistaken, this is the only place that feeds blocks into the session. If a block gets evicted from the interested set before we find it, we'll never forward it to the session.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooh, good catch. the 'InterestedIn' check should also check the liveWants map.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what happens if the user adds the block to the local blockstore via the commandline while we're trying to fetch it from the network?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All blocks added to the datastore get added through either the blockservice or the dagservice (which wraps the blockservice). The blockservice calls "exchange.HasBlock" for each block being added to it, then the blocks get pushed through the notifications channel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that a bit racy? That is, what happens if the user adds the block after the call to blockstore.Get but before the call to exchange.GetBlock (here)?

I'm asking because there's also race condition in this PR when a block is neither in the interest cache (we're interested in too many blocks) nor in the liveWants set (we haven't started fetching it yet) but gets fetched by another session (or manually added by the user). In this case, we'll try to fetch the block from the network anyways even if we already have it.

So, I'm wondering if it's worth adding a notification framework to Blockstores where one can wait for blocks to be added. An alternative is to bitswap with ourself (but that seems kind of hacky).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That race condition exists in the code today

The first one already exists but the second one is a bit different and has a much larger window. To reproduce:

  1. Try fetching 3000 nodes that can't be found on the network.
  2. Manually add 100th node.
  3. Manually add the rest.

The operation will never complete because, when added, the 100th node was neither in the interest set nor in the liveWants set. The quick fix is to add an allWants set and use that instead of of checking liveWants and interest.


However, all these race conditions like this could be fixed by some centralized notification system (doesn't have to be built into the blockstore).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that a notification system would actually eliminate race conditions like this, it seems very tricky to do without taking a lock around all 'put block' calls that gets released once the notification has finished propogating. Will think about this more after some sleep.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be fine as long as we always notify when putting a block into the blockstore. Yes, we may end up starting a bitswap search for a block that you just got but we can cancel it immediately and don't have to wait for it to finish. My primary worry here is that the bitswap search may never finish if the block was retrieved on some side-channel and can't be found on the network.

The way I see this working is as follows:

BlockService.GetBlock will:

  1. Try the blockstore first. Else...
  2. Put a request into a "want" channel.
  3. Subscribe(cid) on the internal notification system.
  4. Call blockstore.Get again in case it was delivered between 1 and 3.
  5. Wait on the channel returned by Subscribe.

BlockService.AddBlock will:

  1. Put the block.
  2. Publish on the notification system.

The main event loop for the BlockService will:

  • Wait for requests on the want channel:
    1. Throw away duplicate want requests.
    2. Record that we're looking for the block.
    3. Asynchronously:
    4. Ask the exchange to find the block.
    5. Send the response back on an exchangeResponse channel.
  • Wait on exchangeResponse channel.
    1. Record that we're done looking for the block.
    2. Call BlockService.AddBlock(...) (triggering the notification system).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note: this is very much "future work" in case that wasn't clear.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien I pushed a fix that at least doesnt lose track of blocks that may be added locally, and left some TODO notes for future work. I think that covers the major outstanding concerns, there will definitely be followup PRs, but this one should be good to go

bs.CancelWants([]*cid.Cid{k}, ses.id)
}
log.Debugf("got block %s from %s", b, p)
if err := bs.HasBlock(b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err)
Expand All @@ -401,12 +408,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {

bs.counterLk.Lock()
defer bs.counterLk.Unlock()
c := bs.counters

bs.blocksRecvd++
bs.dataRecvd += uint64(len(b.RawData()))
c.blocksRecvd++
c.dataRecvd += uint64(len(b.RawData()))
if has {
bs.dupBlocksRecvd++
bs.dupDataRecvd += uint64(blkLen)
c.dupBlocksRecvd++
c.dupDataRecvd += uint64(blkLen)
}
}

Expand Down
Loading