Skip to content

Commit

Permalink
Use go-statemachine + FSMs in retrieval market (#124)
Browse files Browse the repository at this point in the history
* feat(retrievalmarket): convert to state machines

convert retrieval market to using go-statemachine, specifically the FSM module

* refactor(retrievalmarket): update statemachine + PR stuff

update statemachine to latest and address PR comments

* fix(clientstates): update block processing

* fix(deps): mod tidy
  • Loading branch information
hannahhoward authored Feb 26, 2020
1 parent 1f82dd5 commit 5ee324c
Show file tree
Hide file tree
Showing 20 changed files with 1,345 additions and 775 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200210220012-eb75ec747d6b
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/specs-actors v0.0.0-20200220011005-b2a2fbf40362
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200210220012-eb75ec747d6b h1:ds4TQay8wuV+2ucC6ENAeSYQDdl9CWYXnX0gvxzGKHg=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200210220012-eb75ec747d6b/go.mod h1:qsuPYsbKTHH2phNk81aUF9VJIilUxFrnxxnryJh4FOM=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf h1:fbxBG12yrxilPFV1EG2lYqpUyAlRZWkvtqjk2svSeXY=
Expand Down Expand Up @@ -220,6 +222,9 @@ github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v1.0.0 h1:BW3LQIiZzpNyolt84yvKNCd3FU+AK4VDw1hnHR+1aiI=
github.com/ipfs/go-log v1.0.0/go.mod h1:JO7RzlMK6rA+CIxFMLOuB6Wf5b81GDiKElL7UPSIKjA=
github.com/ipfs/go-log v1.0.1 h1:5lIEEOQTk/vd1WuPFBRqz2mcp+5G1fMVcW+Ib/H5Hfo=
github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I=
github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.2 h1:xguurydRdfKMJjKyxNXNU8lYP0VZH1NUwJRwUorjuEw=
github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
Expand Down Expand Up @@ -719,6 +724,8 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
13 changes: 6 additions & 7 deletions retrievalmarket/impl/blockunsealing/blockunsealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ type LoaderWithUnsealing interface {
}

type loaderWithUnsealing struct {
ctx context.Context
bs blockstore.Blockstore
pieceStore piecestore.PieceStore
carIO pieceio.CarIO
unsealer UnsealingFunc
alreadyUnsealed bool
ctx context.Context
bs blockstore.Blockstore
pieceStore piecestore.PieceStore
carIO pieceio.CarIO
unsealer UnsealingFunc
}

// UnsealingFunc is a function that unseals sectors at a given offset and length
Expand All @@ -35,7 +34,7 @@ type UnsealingFunc func(ctx context.Context, sectorId uint64, offset uint64, len
// NewLoaderWithUnsealing creates a loader that will attempt to read blocks from the blockstore but unseal the piece
// as needed using the passed unsealing function
func NewLoaderWithUnsealing(ctx context.Context, bs blockstore.Blockstore, pieceStore piecestore.PieceStore, carIO pieceio.CarIO, unsealer UnsealingFunc) LoaderWithUnsealing {
return &loaderWithUnsealing{ctx, bs, pieceStore, carIO, unsealer, false}
return &loaderWithUnsealing{ctx, bs, pieceStore, carIO, unsealer}
}

func (lu *loaderWithUnsealing) Load(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) {
Expand Down
140 changes: 67 additions & 73 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
)

Expand All @@ -32,25 +35,48 @@ type client struct {
nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID

subscribersLk sync.RWMutex
subscribers []retrievalmarket.ClientSubscriber
resolver retrievalmarket.PeerResolver
subscribersLk sync.RWMutex
subscribers []retrievalmarket.ClientSubscriber
resolver retrievalmarket.PeerResolver
blockVerifiers map[retrievalmarket.DealID]blockio.BlockVerifier
dealStreams map[retrievalmarket.DealID]rmnet.RetrievalDealStream
stateMachines fsm.Group
}

var _ retrievalmarket.RetrievalClient = &client{}

// ClientDsPrefix is the datastore for the client retrievals key
var ClientDsPrefix = "/retrievals/client"

// NewClient creates a new retrieval client
func NewClient(
network rmnet.RetrievalMarketNetwork,
bs blockstore.Blockstore,
node retrievalmarket.RetrievalClientNode,
resolver retrievalmarket.PeerResolver) retrievalmarket.RetrievalClient {
return &client{
network: network,
bs: bs,
node: node,
resolver: resolver,
resolver retrievalmarket.PeerResolver,
ds datastore.Batching,
) (retrievalmarket.RetrievalClient, error) {
c := &client{
network: network,
bs: bs,
node: node,
resolver: resolver,
dealStreams: make(map[retrievalmarket.DealID]rmnet.RetrievalDealStream),
blockVerifiers: make(map[retrievalmarket.DealID]blockio.BlockVerifier),
}
stateMachines, err := fsm.New(namespace.Wrap(ds, datastore.NewKey(ClientDsPrefix)), fsm.Parameters{
Environment: c,
StateType: retrievalmarket.ClientDealState{},
StateKeyField: "Status",
Events: clientstates.ClientEvents,
StateEntryFuncs: clientstates.ClientStateEntryFuncs,
Notifier: c.notifySubscribers,
})
if err != nil {
return nil, err
}
c.stateMachines = stateMachines
return c, nil
}

// V0
Expand Down Expand Up @@ -84,9 +110,7 @@ func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pay
}

// Retrieve begins the process of requesting the data referred to by payloadCID, after a deal is accepted
func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address) retrievalmarket.DealID {
/* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces
-- it will be replaced when we do the V0 implementation of the module */
func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address) (retrievalmarket.DealID, error) {
c.nextDealLk.Lock()
c.nextDealID++
dealID := c.nextDealID
Expand All @@ -110,58 +134,28 @@ func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
Sender: miner,
}

go c.handleDeal(ctx, dealState)

return dealID
}

func (c *client) failDeal(dealState *retrievalmarket.ClientDealState, err error) {
dealState.Message = err.Error()
dealState.Status = retrievalmarket.DealStatusFailed
c.notifySubscribers(retrievalmarket.ClientEventError, *dealState)
}

func (c *client) handleDeal(ctx context.Context, dealState retrievalmarket.ClientDealState) {

c.notifySubscribers(retrievalmarket.ClientEventOpen, dealState)
// start the deal processing
err := c.stateMachines.Begin(dealState.ID, &dealState)
if err != nil {
return 0, err
}

// open stream
s, err := c.network.NewDealStream(dealState.Sender)
if err != nil {
c.failDeal(&dealState, err)
return
return 0, err
}
defer s.Close()

environment := clientDealEnvironment{c.node, blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID}), c.bs, s}

for {
var handler clientstates.ClientHandlerFunc

switch dealState.Status {
case retrievalmarket.DealStatusNew:
handler = clientstates.ProposeDeal
case retrievalmarket.DealStatusAccepted:
handler = clientstates.SetupPaymentChannel
case retrievalmarket.DealStatusPaymentChannelCreated, retrievalmarket.DealStatusOngoing, retrievalmarket.DealStatusUnsealing:
handler = clientstates.ProcessNextResponse
case retrievalmarket.DealStatusFundsNeeded, retrievalmarket.DealStatusFundsNeededLastPayment:
handler = clientstates.ProcessPaymentRequested
default:
c.failDeal(&dealState, xerrors.New("unexpected deal state"))
return
}
dealModifier := handler(ctx, &environment, dealState)
dealModifier(&dealState)
if retrievalmarket.IsTerminalStatus(dealState.Status) {
break
}
c.notifySubscribers(retrievalmarket.ClientEventProgress, dealState)
}
if retrievalmarket.IsTerminalSuccess(dealState.Status) {
c.notifySubscribers(retrievalmarket.ClientEventComplete, dealState)
} else {
c.notifySubscribers(retrievalmarket.ClientEventError, dealState)
c.dealStreams[dealID] = s
c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID})

err = c.stateMachines.Send(dealState.ID, retrievalmarket.ClientEventOpen)
if err != nil {
s.Close()
return 0, err
}

return dealID, nil
}

// unsubscribeAt returns a function that removes an item from the subscribers list by comparing
Expand All @@ -182,9 +176,11 @@ func (c *client) unsubscribeAt(sub retrievalmarket.ClientSubscriber) retrievalma
}
}

func (c *client) notifySubscribers(evt retrievalmarket.ClientEvent, ds retrievalmarket.ClientDealState) {
func (c *client) notifySubscribers(eventName fsm.EventName, state fsm.StateType) {
c.subscribersLk.RLock()
defer c.subscribersLk.RUnlock()
evt := eventName.(retrievalmarket.ClientEvent)
ds := state.(retrievalmarket.ClientDealState)
for _, cb := range c.subscribers {
cb(evt, ds)
}
Expand Down Expand Up @@ -215,22 +211,15 @@ func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDe
panic("not implemented")
}

type clientDealEnvironment struct {
node retrievalmarket.RetrievalClientNode
verifier blockio.BlockVerifier
bs blockstore.Blockstore
stream rmnet.RetrievalDealStream
}

func (cde *clientDealEnvironment) Node() retrievalmarket.RetrievalClientNode {
return cde.node
func (c *client) Node() retrievalmarket.RetrievalClientNode {
return c.node
}

func (cde *clientDealEnvironment) DealStream() rmnet.RetrievalDealStream {
return cde.stream
func (c *client) DealStream(dealID retrievalmarket.DealID) rmnet.RetrievalDealStream {
return c.dealStreams[dealID]
}

func (cde *clientDealEnvironment) ConsumeBlock(ctx context.Context, block retrievalmarket.Block) (uint64, bool, error) {
func (c *client) ConsumeBlock(ctx context.Context, dealID retrievalmarket.DealID, block retrievalmarket.Block) (uint64, bool, error) {
prefix, err := cid.PrefixFromBytes(block.Prefix)
if err != nil {
return 0, false, err
Expand All @@ -246,15 +235,20 @@ func (cde *clientDealEnvironment) ConsumeBlock(ctx context.Context, block retrie
return 0, false, err
}

done, err := cde.verifier.Verify(ctx, blk)
verifier, ok := c.blockVerifiers[dealID]
if !ok {
return 0, false, xerrors.New("no block verifier found")
}

done, err := verifier.Verify(ctx, blk)
if err != nil {
log.Warnf("block verify failed: %s", err)
return 0, false, err
}

// TODO: Smarter out, maybe add to filestore automagically
// (Also, persist intermediate nodes)
err = cde.bs.Put(blk)
err = c.bs.Put(blk)
if err != nil {
log.Warnf("block write failed: %s", err)
return 0, false, err
Expand Down
39 changes: 26 additions & 13 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
func TestClient_Query(t *testing.T) {
ctx := context.Background()

bs := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(ds)

pcid := tut.GenerateCids(1)[0]
expectedPeer := peer.ID("somevalue")
Expand Down Expand Up @@ -58,7 +59,8 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})
c, err := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{}, ds)
require.NoError(t, err)

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
require.NoError(t, err)
Expand All @@ -70,10 +72,11 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
c := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})
c, err := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{}, ds)
require.NoError(t, err)

_, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
_, err = c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
})

Expand All @@ -90,8 +93,9 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})
c, err := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{}, ds)
require.NoError(t, err)

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
Expand All @@ -109,11 +113,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(
c, err := retrievalimpl.NewClient(
net,
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{})
&testPeerResolver{},
ds)
require.NoError(t, err)

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
Expand All @@ -122,7 +128,8 @@ func TestClient_Query(t *testing.T) {
}

func TestClient_FindProviders(t *testing.T) {
bs := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(ds)
expectedPeer := peer.ID("somevalue")

var qsb tut.QueryStreamBuilder = func(p peer.ID) (rmnet.RetrievalQueryStream, error) {
Expand All @@ -139,21 +146,27 @@ func TestClient_FindProviders(t *testing.T) {
peers := tut.RequireGenerateRetrievalPeers(t, 3)
testResolver := testPeerResolver{peers: peers}

c := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver)
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds)
require.NoError(t, err)

testCid := tut.GenerateCids(1)[0]
assert.Len(t, c.FindProviders(testCid), 3)
})

t.Run("when there is an error, returns empty provider list", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}, resolverError: errors.New("boom")}
c := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver)
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds)
require.NoError(t, err)

badCid := tut.GenerateCids(1)[0]
assert.Len(t, c.FindProviders(badCid), 0)
})

t.Run("when there are no providers", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}}
c := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver)
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds)
require.NoError(t, err)

testCid := tut.GenerateCids(1)[0]
assert.Len(t, c.FindProviders(testCid), 0)
})
Expand Down
Loading

0 comments on commit 5ee324c

Please sign in to comment.