Skip to content

Commit

Permalink
add selector to BlockIO classes (#178)
Browse files Browse the repository at this point in the history
* add selector to BlockIO classes

* use ipld.Node and not selector.Selector as param

* use ipld.Node and not selector.Selector as param
  • Loading branch information
shannonwells authored Apr 1, 2020
1 parent 6f76036 commit 473c5d1
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 20 deletions.
8 changes: 5 additions & 3 deletions retrievalmarket/impl/blockio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ type BlockReader interface {
// allowing the next block to be read and then advancing no further
type SelectorBlockReader struct {
root ipld.Link
selector ipld.Node
loader ipld.Loader
traverser *Traverser
}

// NewSelectorBlockReader returns a new Block reader starting at the given
// root and using the given loader
func NewSelectorBlockReader(root ipld.Link, loader ipld.Loader) BlockReader {
return &SelectorBlockReader{root, loader, nil}
func NewSelectorBlockReader(root ipld.Link, sel ipld.Node, loader ipld.Loader) BlockReader {
return &SelectorBlockReader{root, sel, loader, nil}
}

// ReadBlock reads the next block in the IPLD traversal
func (sr *SelectorBlockReader) ReadBlock(ctx context.Context) (retrievalmarket.Block, bool, error) {

if sr.traverser == nil {
sr.traverser = NewTraverser(sr.root)
sr.traverser = NewTraverser(sr.root, sr.selector)
sr.traverser.Start(ctx)
}
lnk, lnkCtx := sr.traverser.CurrentRequest(ctx)
Expand Down
8 changes: 7 additions & 1 deletion retrievalmarket/impl/blockio/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
Expand All @@ -16,8 +19,11 @@ func TestSelectorReader(t *testing.T) {
ctx := context.Background()
testdata := tut.NewTestIPLDTree()

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

t.Run("reads correctly", func(t *testing.T) {
reader := blockio.NewSelectorBlockReader(testdata.RootNodeLnk, testdata.Loader)
reader := blockio.NewSelectorBlockReader(testdata.RootNodeLnk, sel, testdata.Loader)

checkReadSequence(ctx, t, reader, []blocks.Block{
testdata.RootBlock,
Expand Down
11 changes: 5 additions & 6 deletions retrievalmarket/impl/blockio/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
free "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
)

type state struct {
Expand All @@ -28,6 +27,7 @@ type nextResponse struct {
// and waits for manual input (in the form of advance or error)
type Traverser struct {
root ipld.Link
selector ipld.Node
currentLink ipld.Link
currentContext ipld.LinkContext
isDone bool
Expand All @@ -51,9 +51,10 @@ func (t *Traverser) checkState(ctx context.Context) {
}

// NewTraverser creates a new traverser
func NewTraverser(root ipld.Link) *Traverser {
func NewTraverser(root ipld.Link, selector ipld.Node) *Traverser {
return &Traverser{
root: root,
selector: selector,
awaitRequest: make(chan struct{}, 1),
stateChan: make(chan state, 1),
responses: make(chan nextResponse),
Expand Down Expand Up @@ -97,10 +98,8 @@ func (t *Traverser) Start(ctx context.Context) {
t.writeDone(ctx)
return
}
ssb := builder.NewSelectorSpecBuilder(free.NodeBuilder())

allSelector, err := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Selector()
sel, err := selector.ParseSelector(t.selector)
if err != nil {
t.writeDone(ctx)
return
Expand All @@ -111,7 +110,7 @@ func (t *Traverser) Start(ctx context.Context) {
LinkLoader: loader,
LinkNodeBuilderChooser: chooser,
},
}.WalkAdv(nd, allSelector, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
}.WalkAdv(nd, sel, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
t.writeDone(ctx)
}()

Expand Down
8 changes: 7 additions & 1 deletion retrievalmarket/impl/blockio/traverser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"testing"

blocks "github.com/ipfs/go-block-format"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
Expand All @@ -17,8 +20,11 @@ func TestTraverser(t *testing.T) {
ctx := context.Background()
testdata := tut.NewTestIPLDTree()

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

t.Run("traverses correctly", func(t *testing.T) {
traverser := blockio.NewTraverser(testdata.RootNodeLnk)
traverser := blockio.NewTraverser(testdata.RootNodeLnk, sel)
traverser.Start(ctx)
checkTraverseSequence(ctx, t, traverser, []blocks.Block{
testdata.RootBlock,
Expand Down
7 changes: 4 additions & 3 deletions retrievalmarket/impl/blockio/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ type BlockVerifier interface {
// in the order they are traversed in a dag walk
type SelectorVerifier struct {
root ipld.Link
selector ipld.Node
traverser *Traverser
}

// NewSelectorVerifier returns a new selector based block verifier
func NewSelectorVerifier(root ipld.Link) BlockVerifier {
return &SelectorVerifier{root, nil}
func NewSelectorVerifier(root ipld.Link, selector ipld.Node) BlockVerifier {
return &SelectorVerifier{root, selector, nil}
}

// Verify verifies that the given block is the next one needed for the current traversal
// and returns true if the traversal is done
func (sv *SelectorVerifier) Verify(ctx context.Context, blk blocks.Block) (done bool, err error) {
if sv.traverser == nil {
sv.traverser = NewTraverser(sv.root)
sv.traverser = NewTraverser(sv.root, sv.selector)
sv.traverser.Start(ctx)
}
if sv.traverser.IsComplete(ctx) {
Expand Down
15 changes: 11 additions & 4 deletions retrievalmarket/impl/blockio/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"testing"

blocks "github.com/ipfs/go-block-format"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
Expand All @@ -15,8 +18,11 @@ func TestSelectorVerifier(t *testing.T) {
ctx := context.Background()
testdata := tut.NewTestIPLDTree()

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

t.Run("verifies correctly", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, false, []blocks.Block{
testdata.RootBlock,
testdata.LeafAlphaBlock,
Expand All @@ -29,15 +35,16 @@ func TestSelectorVerifier(t *testing.T) {
testdata.LeafAlphaBlock,
})
})

t.Run("fed incorrect block", func(t *testing.T) {
t.Run("right away", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, true, []blocks.Block{
testdata.LeafAlphaBlock,
})
})
t.Run("in middle", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, true, []blocks.Block{
testdata.RootBlock,
testdata.LeafAlphaBlock,
Expand All @@ -46,7 +53,7 @@ func TestSelectorVerifier(t *testing.T) {
})
})
t.Run("at end", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, true, []blocks.Block{
testdata.RootBlock,
testdata.LeafAlphaBlock,
Expand Down
3 changes: 2 additions & 1 deletion retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
}

c.dealStreams[dealID] = s
c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID})

c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID}, allSelector())

err = c.stateMachines.Send(dealState.ID, retrievalmarket.ClientEventOpen)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions retrievalmarket/impl/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package retrievalimpl

import (
"github.com/ipld/go-ipld-prime"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
)

func allSelector() ipld.Node {
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
return ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).
Node()
}
3 changes: 2 additions & 1 deletion retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func (p *provider) newProviderDeal(stream rmnet.RetrievalDealStream) error {
p.dealStreams[pds.Identifier()] = stream

loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(context.TODO(), p.bs, p.pieceStore, cario.NewCarIO(), p.node.UnsealSector)
br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, loaderWithUnsealing.Load)

br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, allSelector(), loaderWithUnsealing.Load)
p.blockReaders[pds.Identifier()] = br

// start the deal processing, synchronously so we can log the error and close the stream if it doesn't start
Expand Down

0 comments on commit 473c5d1

Please sign in to comment.