Skip to content

Commit

Permalink
WIP: wire sessions up through into FetchGraph
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping authored and Kubuxu committed May 7, 2017
1 parent e07de19 commit 348c75f
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 12 deletions.
57 changes: 52 additions & 5 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
Expand All @@ -31,6 +32,7 @@ type BlockService interface {
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
NewSession(context.Context) *Session
Close() error
}

Expand Down Expand Up @@ -77,6 +79,21 @@ func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}

func (bs *blockService) NewSession(ctx context.Context) *Session {
bswap, ok := bs.Exchange().(*bitswap.Bitswap)
if ok {
ses := bswap.NewSession(ctx)
return &Session{
ses: ses,
bs: bs.blockstore,
}
}
return &Session{
ses: bs.exchange,
bs: bs.blockstore,
}
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
Expand Down Expand Up @@ -141,16 +158,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

block, err := s.blockstore.Get(c)
var f exchange.Fetcher
if s.exchange != nil {
f = s.exchange
}

return getBlock(ctx, c, s.blockstore, f)
}

func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
block, err := bs.Get(c)
if err == nil {
return block, nil
}

if err == blockstore.ErrNotFound && s.exchange != nil {
if err == blockstore.ErrNotFound && f != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.exchange.GetBlock(ctx, c)
blk, err := f.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -172,12 +198,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.blockstore, s.exchange)
}

func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.blockstore.Get(c)
hit, err := bs.Get(c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -194,7 +224,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}

rblocks, err := s.exchange.GetBlocks(ctx, misses)
rblocks, err := f.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
Expand All @@ -220,3 +250,20 @@ func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.exchange.Close()
}

type Session struct {
bs blockstore.Blockstore
ses exchange.Session
}

func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
return getBlock(ctx, c, s.bs, s.ses)
}

func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.bs, s.ses)
}

func (s *Session) Close() error {
return s.ses.Close()
}
5 changes: 5 additions & 0 deletions exchange/bitswap/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) {
}
}

func (s *Session) Close() error {
// TODO:
return nil
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
Expand Down
16 changes: 12 additions & 4 deletions exchange/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ import (
// Any type that implements exchange.Interface may be used as an IPFS block
// exchange protocol.
type Interface interface { // type Exchanger interface
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, *cid.Cid) (blocks.Block, error)

GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
Fetcher

// TODO Should callers be concerned with whether the block was made
// available on the network?
Expand All @@ -26,3 +23,14 @@ type Interface interface { // type Exchanger interface

io.Closer
}

type Fetcher interface {
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, *cid.Cid) (blocks.Block, error)
GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
}

type Session interface {
Fetcher
Close() error
}
25 changes: 22 additions & 3 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (n *dagService) Remove(nd node.Node) error {
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService. If the node does not exist
// locally (and can not be retrieved) an error will be returned.
func GetLinksDirect(serv DAGService) GetLinks {
func GetLinksDirect(serv node.NodeGetter) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
node, err := serv.Get(ctx, c)
if err != nil {
Expand All @@ -163,11 +163,30 @@ func GetLinksDirect(serv DAGService) GetLinks {
}
}

type sesGetter struct {
bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
blk, err := sg.bs.GetBlock(ctx, c)
if err != nil {
return nil, err
}

return decodeBlock(blk)
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
var ng node.NodeGetter = serv
ds, ok := serv.(*dagService)
if ok {
ng = &sesGetter{ds.Blocks.NewSession(ctx)}
}

v, _ := ctx.Value("progress").(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
Expand All @@ -178,7 +197,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
return false
}
}
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down

0 comments on commit 348c75f

Please sign in to comment.