Skip to content

Commit

Permalink
Restartable storage deals (#270)
Browse files Browse the repository at this point in the history
* Add finality states to client/provider FSM

WIP

Remove test for ensuring that state entry fn gets executed.  This is already tested in go-statemachine.

WIP

WIP2

# Conflicts:
#	storagemarket/impl/provider.go
#	storagemarket/integration_test.go

* Restart logic for non-terminates deals

* Update go-statemachine

* PR Feedback

* Rebase

* Fix flakey test
  • Loading branch information
ingar committed Jun 15, 2020
1 parent eec6fdc commit 9a88b8a
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 59 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-data-transfer v0.3.0
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
github.com/filecoin-project/go-statemachine v0.0.0-20200612181802-4eb3d0c68eba
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/sector-storage v0.0.0-20200508203401-a74812ba12f3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:9
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE=
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200612181802-4eb3d0c68eba h1:GEWb/6KQyNZt4jm8fgVcIFPH0ElAGXfHM59ZSiqPTvY=
github.com/filecoin-project/go-statemachine v0.0.0-20200612181802-4eb3d0c68eba/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
Expand Down
2 changes: 1 addition & 1 deletion retrievalmarket/storage_retrieval_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
func TestStorageRetrieval(t *testing.T) {
bgCtx := context.Background()
sh := newStorageHarness(bgCtx, t)
sh.Client.Run(bgCtx)
require.NoError(t, sh.Client.Start(bgCtx))
require.NoError(t, sh.Provider.Start(bgCtx))

// set up a subscriber
Expand Down
16 changes: 16 additions & 0 deletions shared_testutil/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (

"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -273,3 +277,15 @@ func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket
}
return peers
}

type FakeDTValidator struct{}

func (v *FakeDTValidator) ValidatePush(sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error {
return nil
}

func (v *FakeDTValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error {
return nil
}

var _ datatransfer.RequestValidator = (*FakeDTValidator)(nil)
86 changes: 70 additions & 16 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,11 @@ func NewClient(
conns: connmanager.NewConnManager(),
}

statemachines, err := fsm.New(ds, fsm.Parameters{
Environment: &clientDealEnvironment{c},
StateType: storagemarket.ClientDeal{},
StateKeyField: "State",
Events: clientstates.ClientEvents,
StateEntryFuncs: clientstates.ClientStateEntryFuncs,
Notifier: c.dispatch,
})
statemachines, err := NewClientStateMachine(
ds,
&clientDealEnvironment{c},
c.dispatch,
)
if err != nil {
return nil, err
}
Expand All @@ -93,11 +90,12 @@ func NewClient(
return c, nil
}

func (c *Client) Run(ctx context.Context) {
func (c *Client) Start(ctx context.Context) error {
return c.restartDeals()
}

func (c *Client) Stop() {
_ = c.statemachines.Stop(context.TODO())
func (c *Client) Stop() error {
return c.statemachines.Stop(context.TODO())
}

func (c *Client) ListProviders(ctx context.Context) (<-chan storagemarket.StorageProviderInfo, error) {
Expand Down Expand Up @@ -301,6 +299,35 @@ func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) sh
return shared.Unsubscribe(c.pubSub.Subscribe(subscriber))
}

func (c *Client) restartDeals() error {
var deals []storagemarket.ClientDeal
err := c.statemachines.List(&deals)
if err != nil {
return err
}

for _, deal := range deals {
if c.statemachines.IsTerminated(deal) {
continue
}

if deal.ConnectionClosed {
continue
}

_, err := c.ensureDealStream(deal.Miner, deal.ProposalCid)
if err != nil {
return err
}

err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventRestart)
if err != nil {
return err
}
}
return nil
}

func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ClientEvent)
if !ok {
Expand All @@ -317,6 +344,36 @@ func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) {
}
}

func (c *Client) ensureDealStream(provider peer.ID, proposalCid cid.Cid) (network.StorageDealStream, error) {
s, err := c.conns.DealStream(proposalCid)
if err == nil {
return s, nil
}

s, err = c.net.NewDealStream(provider)
if err != nil {
return nil, err
}

err = c.conns.AddStream(proposalCid, s)
if err != nil {
return nil, err
}
return s, nil
}

func NewClientStateMachine(ds datastore.Datastore, env fsm.Environment, notifier fsm.Notifier) (fsm.Group, error) {
return fsm.New(ds, fsm.Parameters{
Environment: env,
StateType: storagemarket.ClientDeal{},
StateKeyField: "State",
Events: clientstates.ClientEvents,
StateEntryFuncs: clientstates.ClientStateEntryFuncs,
FinalityStates: clientstates.ClientFinalityStates,
Notifier: notifier,
})
}

type internalClientEvent struct {
evt storagemarket.ClientEvent
deal storagemarket.ClientDeal
Expand Down Expand Up @@ -348,14 +405,11 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode {
}

func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposalCid cid.Cid, proposal network.Proposal) error {
s, err := c.c.net.NewDealStream(p)
if err != nil {
return err
}
err = c.c.conns.AddStream(proposalCid, s)
s, err := c.c.ensureDealStream(p, proposalCid)
if err != nil {
return err
}

err = s.WriteDealProposal(proposal)
return err
}
Expand Down
6 changes: 6 additions & 0 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealSealing).To(storagemarket.StorageDealActive),
fsm.Event(storagemarket.ClientEventFailed).
From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError),
fsm.Event(storagemarket.ClientEventRestart).FromAny().ToNoChange(),
}

// ClientStateEntryFuncs are the handlers for different states in a storage client
Expand All @@ -123,3 +124,8 @@ var ClientStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealFailing: FailDeal,
}

var ClientFinalityStates = []fsm.StateKey{
storagemarket.StorageDealActive,
storagemarket.StorageDealError,
}
14 changes: 14 additions & 0 deletions storagemarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket/testnodes"
Expand Down Expand Up @@ -336,6 +338,18 @@ func TestFailDeal(t *testing.T) {
})
}

func TestFinalityStates(t *testing.T) {
group, err := storageimpl.NewClientStateMachine(nil, &fakeEnvironment{}, nil)
require.NoError(t, err)

for _, status := range []storagemarket.StorageDealStatus{
storagemarket.StorageDealActive,
storagemarket.StorageDealError,
} {
require.True(t, group.IsTerminated(storagemarket.ClientDeal{State: status}))
}
}

type envParams struct {
dealStream smnet.StorageDealStream
closeStreamErr error
Expand Down
57 changes: 49 additions & 8 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,11 @@ func NewProvider(net network.StorageMarketNetwork,
pubSub: pubsub.New(providerDispatcher),
}

deals, err := fsm.New(ds, fsm.Parameters{
Environment: &providerDealEnvironment{h},
StateType: storagemarket.MinerDeal{},
StateKeyField: "State",
Events: providerstates.ProviderEvents,
StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
Notifier: h.dispatch,
})
deals, err := NewProviderStateMachine(
ds,
&providerDealEnvironment{h},
h.dispatch,
)
if err != nil {
return nil, err
}
Expand All @@ -155,6 +152,12 @@ func (p *Provider) Start(ctx context.Context) error {
if err != nil {
return err
}

err = p.restartDeals()
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -370,6 +373,44 @@ func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) {
}
}

func (c *Provider) restartDeals() error {
var deals []storagemarket.MinerDeal
err := c.deals.List(&deals)
if err != nil {
return err
}

for _, deal := range deals {
if c.deals.IsTerminated(deal) {
continue
}

if deal.ConnectionClosed {
continue
}

// TODO: Fixup deal streams if necessary...

err = c.deals.Send(deal.ProposalCid, storagemarket.ProviderEventRestart)
if err != nil {
return err
}
}
return nil
}

func NewProviderStateMachine(ds datastore.Datastore, env fsm.Environment, notifier fsm.Notifier) (fsm.Group, error) {
return fsm.New(ds, fsm.Parameters{
Environment: env,
StateType: storagemarket.MinerDeal{},
StateKeyField: "State",
Events: providerstates.ProviderEvents,
StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
FinalityStates: providerstates.ProviderFinalityStates,
Notifier: notifier,
})
}

type internalProviderEvent struct {
evt storagemarket.ProviderEvent
deal storagemarket.MinerDeal
Expand Down
6 changes: 6 additions & 0 deletions storagemarket/impl/providerstates/provider_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ var ProviderEvents = fsm.Events{
return nil
}),
fsm.Event(storagemarket.ProviderEventFailed).From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError),
fsm.Event(storagemarket.ProviderEventRestart).FromAny().ToNoChange(),
}

// ProviderStateEntryFuncs are the handlers for different states in a storage client
Expand All @@ -136,3 +137,8 @@ var ProviderStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealActive: RecordPieceInfo,
storagemarket.StorageDealFailing: FailDeal,
}

var ProviderFinalityStates = []fsm.StateKey{
storagemarket.StorageDealError,
storagemarket.StorageDealCompleted,
}
13 changes: 13 additions & 0 deletions storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
Expand Down Expand Up @@ -685,6 +686,18 @@ func TestFailDeal(t *testing.T) {
}
}

func TestFinalityStates(t *testing.T) {
group, err := storageimpl.NewProviderStateMachine(nil, &fakeEnvironment{}, nil)
require.NoError(t, err)

for _, status := range []storagemarket.StorageDealStatus{
storagemarket.StorageDealCompleted,
storagemarket.StorageDealError,
} {
require.True(t, group.IsTerminated(storagemarket.MinerDeal{State: status}))
}
}

// all of these default parameters are setup to allow a deal to complete each handler with no errors
var defaultHeight = abi.ChainEpoch(50)
var defaultTipSetToken = []byte{1, 2, 3}
Expand Down
Loading

0 comments on commit 9a88b8a

Please sign in to comment.