diff --git a/go.mod b/go.mod index 69efcbfc..66ac3052 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-data-transfer v0.3.0 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 - github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 + github.com/filecoin-project/go-statemachine v0.0.0-20200612181802-4eb3d0c68eba github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/filecoin-project/sector-storage v0.0.0-20200508203401-a74812ba12f3 diff --git a/go.sum b/go.sum index 4a2a24cb..cec8add2 100644 --- a/go.sum +++ b/go.sum @@ -83,8 +83,8 @@ github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:9 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE= github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= -github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0= -github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= +github.com/filecoin-project/go-statemachine v0.0.0-20200612181802-4eb3d0c68eba h1:GEWb/6KQyNZt4jm8fgVcIFPH0ElAGXfHM59ZSiqPTvY= +github.com/filecoin-project/go-statemachine v0.0.0-20200612181802-4eb3d0c68eba/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index 5ae55425..1e407000 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -46,7 +46,7 @@ import ( func TestStorageRetrieval(t *testing.T) { bgCtx := context.Background() sh := newStorageHarness(bgCtx, t) - sh.Client.Run(bgCtx) + require.NoError(t, sh.Client.Start(bgCtx)) require.NoError(t, sh.Provider.Start(bgCtx)) // set up a subscriber diff --git a/shared_testutil/generators.go b/shared_testutil/generators.go index d1297c7b..017bc411 100644 --- a/shared_testutil/generators.go +++ b/shared_testutil/generators.go @@ -7,10 +7,14 @@ import ( "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/test" "github.com/stretchr/testify/require" @@ -273,3 +277,15 @@ func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket } return peers } + +type FakeDTValidator struct{} + +func (v *FakeDTValidator) ValidatePush(sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error { + return nil +} + +func (v *FakeDTValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error { + return nil +} + +var _ datatransfer.RequestValidator = (*FakeDTValidator)(nil) diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 9af62ba4..3835742f 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -74,14 +74,11 @@ func NewClient( conns: connmanager.NewConnManager(), } - statemachines, err := fsm.New(ds, fsm.Parameters{ - Environment: &clientDealEnvironment{c}, - StateType: storagemarket.ClientDeal{}, - StateKeyField: "State", - Events: clientstates.ClientEvents, - StateEntryFuncs: clientstates.ClientStateEntryFuncs, - Notifier: c.dispatch, - }) + statemachines, err := NewClientStateMachine( + ds, + &clientDealEnvironment{c}, + c.dispatch, + ) if err != nil { return nil, err } @@ -93,11 +90,12 @@ func NewClient( return c, nil } -func (c *Client) Run(ctx context.Context) { +func (c *Client) Start(ctx context.Context) error { + return c.restartDeals() } -func (c *Client) Stop() { - _ = c.statemachines.Stop(context.TODO()) +func (c *Client) Stop() error { + return c.statemachines.Stop(context.TODO()) } func (c *Client) ListProviders(ctx context.Context) (<-chan storagemarket.StorageProviderInfo, error) { @@ -301,6 +299,35 @@ func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) sh return shared.Unsubscribe(c.pubSub.Subscribe(subscriber)) } +func (c *Client) restartDeals() error { + var deals []storagemarket.ClientDeal + err := c.statemachines.List(&deals) + if err != nil { + return err + } + + for _, deal := range deals { + if c.statemachines.IsTerminated(deal) { + continue + } + + if deal.ConnectionClosed { + continue + } + + _, err := c.ensureDealStream(deal.Miner, deal.ProposalCid) + if err != nil { + return err + } + + err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventRestart) + if err != nil { + return err + } + } + return nil +} + func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) { evt, ok := eventName.(storagemarket.ClientEvent) if !ok { @@ -317,6 +344,36 @@ func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) { } } +func (c *Client) ensureDealStream(provider peer.ID, proposalCid cid.Cid) (network.StorageDealStream, error) { + s, err := c.conns.DealStream(proposalCid) + if err == nil { + return s, nil + } + + s, err = c.net.NewDealStream(provider) + if err != nil { + return nil, err + } + + err = c.conns.AddStream(proposalCid, s) + if err != nil { + return nil, err + } + return s, nil +} + +func NewClientStateMachine(ds datastore.Datastore, env fsm.Environment, notifier fsm.Notifier) (fsm.Group, error) { + return fsm.New(ds, fsm.Parameters{ + Environment: env, + StateType: storagemarket.ClientDeal{}, + StateKeyField: "State", + Events: clientstates.ClientEvents, + StateEntryFuncs: clientstates.ClientStateEntryFuncs, + FinalityStates: clientstates.ClientFinalityStates, + Notifier: notifier, + }) +} + type internalClientEvent struct { evt storagemarket.ClientEvent deal storagemarket.ClientDeal @@ -348,14 +405,11 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode { } func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposalCid cid.Cid, proposal network.Proposal) error { - s, err := c.c.net.NewDealStream(p) - if err != nil { - return err - } - err = c.c.conns.AddStream(proposalCid, s) + s, err := c.c.ensureDealStream(p, proposalCid) if err != nil { return err } + err = s.WriteDealProposal(proposal) return err } diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index dd6e7a29..246e4b5a 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -110,6 +110,7 @@ var ClientEvents = fsm.Events{ From(storagemarket.StorageDealSealing).To(storagemarket.StorageDealActive), fsm.Event(storagemarket.ClientEventFailed). From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError), + fsm.Event(storagemarket.ClientEventRestart).FromAny().ToNoChange(), } // ClientStateEntryFuncs are the handlers for different states in a storage client @@ -123,3 +124,8 @@ var ClientStateEntryFuncs = fsm.StateEntryFuncs{ storagemarket.StorageDealSealing: VerifyDealActivated, storagemarket.StorageDealFailing: FailDeal, } + +var ClientFinalityStates = []fsm.StateKey{ + storagemarket.StorageDealActive, + storagemarket.StorageDealError, +} diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index e6080f14..e38ed91e 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -18,9 +18,11 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" + storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" @@ -336,6 +338,18 @@ func TestFailDeal(t *testing.T) { }) } +func TestFinalityStates(t *testing.T) { + group, err := storageimpl.NewClientStateMachine(nil, &fakeEnvironment{}, nil) + require.NoError(t, err) + + for _, status := range []storagemarket.StorageDealStatus{ + storagemarket.StorageDealActive, + storagemarket.StorageDealError, + } { + require.True(t, group.IsTerminated(storagemarket.ClientDeal{State: status})) + } +} + type envParams struct { dealStream smnet.StorageDealStream closeStreamErr error diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 79efcd52..05e483f3 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -127,14 +127,11 @@ func NewProvider(net network.StorageMarketNetwork, pubSub: pubsub.New(providerDispatcher), } - deals, err := fsm.New(ds, fsm.Parameters{ - Environment: &providerDealEnvironment{h}, - StateType: storagemarket.MinerDeal{}, - StateKeyField: "State", - Events: providerstates.ProviderEvents, - StateEntryFuncs: providerstates.ProviderStateEntryFuncs, - Notifier: h.dispatch, - }) + deals, err := NewProviderStateMachine( + ds, + &providerDealEnvironment{h}, + h.dispatch, + ) if err != nil { return nil, err } @@ -155,6 +152,12 @@ func (p *Provider) Start(ctx context.Context) error { if err != nil { return err } + + err = p.restartDeals() + if err != nil { + return err + } + return nil } @@ -370,6 +373,44 @@ func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) { } } +func (c *Provider) restartDeals() error { + var deals []storagemarket.MinerDeal + err := c.deals.List(&deals) + if err != nil { + return err + } + + for _, deal := range deals { + if c.deals.IsTerminated(deal) { + continue + } + + if deal.ConnectionClosed { + continue + } + + // TODO: Fixup deal streams if necessary... + + err = c.deals.Send(deal.ProposalCid, storagemarket.ProviderEventRestart) + if err != nil { + return err + } + } + return nil +} + +func NewProviderStateMachine(ds datastore.Datastore, env fsm.Environment, notifier fsm.Notifier) (fsm.Group, error) { + return fsm.New(ds, fsm.Parameters{ + Environment: env, + StateType: storagemarket.MinerDeal{}, + StateKeyField: "State", + Events: providerstates.ProviderEvents, + StateEntryFuncs: providerstates.ProviderStateEntryFuncs, + FinalityStates: providerstates.ProviderFinalityStates, + Notifier: notifier, + }) +} + type internalProviderEvent struct { evt storagemarket.ProviderEvent deal storagemarket.MinerDeal diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index 083f4da3..7f8e7c2f 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -120,6 +120,7 @@ var ProviderEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ProviderEventFailed).From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError), + fsm.Event(storagemarket.ProviderEventRestart).FromAny().ToNoChange(), } // ProviderStateEntryFuncs are the handlers for different states in a storage client @@ -136,3 +137,8 @@ var ProviderStateEntryFuncs = fsm.StateEntryFuncs{ storagemarket.StorageDealActive: RecordPieceInfo, storagemarket.StorageDealFailing: FailDeal, } + +var ProviderFinalityStates = []fsm.StateKey{ + storagemarket.StorageDealError, + storagemarket.StorageDealCompleted, +} diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 1be48fba..663896fd 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -25,6 +25,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" + storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/network" @@ -685,6 +686,18 @@ func TestFailDeal(t *testing.T) { } } +func TestFinalityStates(t *testing.T) { + group, err := storageimpl.NewProviderStateMachine(nil, &fakeEnvironment{}, nil) + require.NoError(t, err) + + for _, status := range []storagemarket.StorageDealStatus{ + storagemarket.StorageDealCompleted, + storagemarket.StorageDealError, + } { + require.True(t, group.IsTerminated(storagemarket.MinerDeal{State: status})) + } +} + // all of these default parameters are setup to allow a deal to complete each handler with no errors var defaultHeight = abi.ChainEpoch(50) var defaultTipSetToken = []byte{1, 2, 3} diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index d6080e7f..d0266dee 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -7,20 +7,18 @@ import ( "io/ioutil" "math/rand" "path/filepath" + "sync" "testing" "time" "github.com/filecoin-project/go-address" - datatransfer "github.com/filecoin-project/go-data-transfer" graphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -42,9 +40,8 @@ import ( func TestMakeDeal(t *testing.T) { ctx := context.Background() h := newHarness(t, ctx) - h.Client.Run(ctx) - err := h.Provider.Start(ctx) - assert.NoError(t, err) + require.NoError(t, h.Provider.Start(ctx)) + require.NoError(t, h.Client.Start(ctx)) // set up a subscriber providerDealChan := make(chan storagemarket.MinerDeal) @@ -71,7 +68,7 @@ func TestMakeDeal(t *testing.T) { _ = h.Client.SubscribeToEvents(clientSubscriber) // set ask price where we'll accept any price - err = h.Provider.SetAsk(big.NewInt(0), 50_000) + err := h.Provider.SetAsk(big.NewInt(0), 50_000) assert.NoError(t, err) result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}) @@ -152,7 +149,7 @@ func TestMakeDeal(t *testing.T) { func TestMakeDealOffline(t *testing.T) { ctx := context.Background() h := newHarness(t, ctx) - h.Client.Run(ctx) + require.NoError(t, h.Client.Start(ctx)) carBuf := new(bytes.Buffer) @@ -209,13 +206,12 @@ func TestMakeDealNonBlocking(t *testing.T) { h := newHarness(t, ctx) testCids := shared_testutil.GenerateCids(2) - h.ClientNode.AddFundsCid = testCids[0] - h.Client.Run(ctx) - h.ProviderNode.WaitForMessageBlocks = true h.ProviderNode.AddFundsCid = testCids[1] - err := h.Provider.Start(ctx) - assert.NoError(t, err) + require.NoError(t, h.Provider.Start(ctx)) + + h.ClientNode.AddFundsCid = testCids[0] + require.NoError(t, h.Client.Start(ctx)) result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}) @@ -234,6 +230,70 @@ func TestMakeDealNonBlocking(t *testing.T) { shared_testutil.AssertDealState(t, storagemarket.StorageDealProviderFunding, pd.State) } +func TestRestartClient(t *testing.T) { + ctx := context.Background() + h := newHarness(t, ctx) + + require.NoError(t, h.Provider.Start(ctx)) + require.NoError(t, h.Client.Start(ctx)) + + // set ask price where we'll accept any price + err := h.Provider.SetAsk(big.NewInt(0), 50_000) + assert.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + _ = h.Client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + if event == storagemarket.ClientEventFundsEnsured { + // Stop the client and provider at some point during deal negotiation + require.NoError(t, h.Client.Stop()) + require.NoError(t, h.Provider.Stop()) + wg.Done() + } + }) + + result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}) + proposalCid := result.ProposalCid + + wg.Wait() + + cd, err := h.Client.GetLocalDeal(ctx, proposalCid) + assert.NoError(t, err) + assert.NotEqual(t, storagemarket.StorageDealActive, cd.State) + + h = newHarnessWithTestData(t, ctx, h.TestData, h.SMState) + + wg.Add(1) + _ = h.Client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + if event == storagemarket.ClientEventDealActivated { + wg.Done() + } + }) + + wg.Add(1) + _ = h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + if event == storagemarket.ProviderEventDealCompleted { + wg.Done() + } + }) + + require.NoError(t, h.Provider.Start(ctx)) + require.NoError(t, h.Client.Start(ctx)) + + wg.Wait() + + cd, err = h.Client.GetLocalDeal(ctx, proposalCid) + assert.NoError(t, err) + shared_testutil.AssertDealState(t, storagemarket.StorageDealActive, cd.State) + + providerDeals, err := h.Provider.ListLocalDeals() + assert.NoError(t, err) + + pd := providerDeals[0] + assert.Equal(t, pd.ProposalCid, proposalCid) + shared_testutil.AssertDealState(t, storagemarket.StorageDealCompleted, pd.State) +} + type harness struct { Ctx context.Context Epoch abi.ChainEpoch @@ -243,18 +303,22 @@ type harness struct { ClientNode *testnodes.FakeClientNode Provider storagemarket.StorageProvider ProviderNode *testnodes.FakeProviderNode + SMState *testnodes.StorageMarketState ProviderInfo storagemarket.StorageProviderInfo TestData *shared_testutil.Libp2pTestData } func newHarness(t *testing.T, ctx context.Context) *harness { + smState := testnodes.NewStorageMarketState() + return newHarnessWithTestData(t, ctx, shared_testutil.NewLibp2pTestData(ctx, t), smState) +} + +func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testutil.Libp2pTestData, smState *testnodes.StorageMarketState) *harness { epoch := abi.ChainEpoch(100) - td := shared_testutil.NewLibp2pTestData(ctx, t) fpath := filepath.Join("storagemarket", "fixtures", "payload.txt") rootLink := td.LoadUnixFSFile(t, fpath, false) payloadCid := rootLink.(cidlink.Link).Cid - smState := testnodes.NewStorageMarketState() clientNode := testnodes.FakeClientNode{ FakeCommonNode: testnodes.FakeCommonNode{SMState: smState}, ClientAddr: address.TestAddress, @@ -282,7 +346,7 @@ func newHarness(t *testing.T, ctx context.Context) *harness { // create provider and client dt1 := graphsync.NewGraphSyncDataTransfer(td.Host1, td.GraphSync1, td.DTStoredCounter1) - require.NoError(t, dt1.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &fakeDTValidator{})) + require.NoError(t, dt1.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &shared_testutil.FakeDTValidator{})) client, err := storageimpl.NewClient( network.NewFromLibp2pHost(td.Host1), @@ -295,7 +359,7 @@ func newHarness(t *testing.T, ctx context.Context) *harness { require.NoError(t, err) dt2 := graphsync.NewGraphSyncDataTransfer(td.Host2, td.GraphSync2, td.DTStoredCounter2) - require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &fakeDTValidator{})) + require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &shared_testutil.FakeDTValidator{})) storedAsk, err := storedask.NewStoredAsk(td.Ds2, datastore.NewKey("latest-ask"), providerNode, providerAddr) assert.NoError(t, err) @@ -340,6 +404,7 @@ func newHarness(t *testing.T, ctx context.Context) *harness { ProviderNode: providerNode, ProviderInfo: providerInfo, TestData: td, + SMState: smState, } } @@ -358,15 +423,3 @@ func (h *harness) ProposeStorageDeal(t *testing.T, dataRef *storagemarket.DataRe assert.NoError(t, err) return result } - -type fakeDTValidator struct{} - -func (v *fakeDTValidator) ValidatePush(sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error { - return nil -} - -func (v *fakeDTValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error { - return nil -} - -var _ datatransfer.RequestValidator = (*fakeDTValidator)(nil) diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index 16aadb5d..2c6ccd1f 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -98,6 +98,7 @@ type FakeCommonNode struct { WaitForMessageExitCode exitcode.ExitCode WaitForMessageRetBytes []byte WaitForMessageNodeError error + WaitForMessageCalls []cid.Cid } // GetChainHead returns the state id in the storage market state @@ -129,6 +130,8 @@ func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.A } func (n *FakeCommonNode) WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, error) error) error { + n.WaitForMessageCalls = append(n.WaitForMessageCalls, mcid) + if n.WaitForMessageError != nil { return n.WaitForMessageError } diff --git a/storagemarket/types.go b/storagemarket/types.go index 57af4f30..0648c56e 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -236,6 +236,9 @@ const ( // ProviderEventFailed indicates a deal has failed and should no longer be processed ProviderEventFailed + + // ProviderEventRestart is used to resume the deal after a state machine shutdown + ProviderEventRestart ) // ProviderEvents maps provider event codes to string names @@ -269,6 +272,7 @@ var ProviderEvents = map[ProviderEvent]string{ ProviderEventReadMetadataErrored: "ProviderEventReadMetadataErrored", ProviderEventDealCompleted: "ProviderEventDealCompleted", ProviderEventFailed: "ProviderEventFailed", + ProviderEventRestart: "ProviderEventRestart", } type ClientDeal struct { @@ -350,6 +354,9 @@ const ( // ClientEventFailed happens when a deal terminates in failure ClientEventFailed + + // ClientEventRestart is used to resume the deal after a state machine shutdown + ClientEventRestart ) // ClientEvents maps client event codes to string names @@ -375,6 +382,7 @@ var ClientEvents = map[ClientEvent]string{ ClientEventDealActivationFailed: "ClientEventDealActivationFailed", ClientEventDealActivated: "ClientEventDealActivated", ClientEventFailed: "ClientEventFailed", + ClientEventRestart: "ClientEventRestart", } // StorageDeal is a local combination of a proposal and a current deal state @@ -533,9 +541,9 @@ type DataRef struct { // The interface provided by the module to the outside world for storage clients. type StorageClient interface { - Run(ctx context.Context) + Start(ctx context.Context) error - Stop() + Stop() error // ListProviders queries chain state and returns active storage providers ListProviders(ctx context.Context) (<-chan StorageProviderInfo, error)