From 56b1c218068f20afa059c97553537231070cdf08 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Fri, 17 Apr 2020 13:11:17 -0700 Subject: [PATCH 1/4] Add more states in Client and Provider FSM representing async ops: - Waiting for storage market funds to appear - Waiting for deals to be published --- storagemarket/impl/client.go | 23 +- storagemarket/impl/clientstates/client_fsm.go | 25 +- .../impl/clientstates/client_states.go | 36 ++- .../impl/clientstates/cliest_states_test.go | 72 ++--- storagemarket/impl/provider.go | 23 +- .../impl/providerstates/provider_fsm.go | 29 +- .../impl/providerstates/provider_states.go | 36 ++- .../providerstates/provider_states_test.go | 46 ++- storagemarket/integration_test.go | 284 +++++++++--------- storagemarket/testnodes/testnodes.go | 23 +- storagemarket/types.go | 85 ++++-- 11 files changed, 426 insertions(+), 256 deletions(-) diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index ab39d82b..71d9120b 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -275,7 +276,27 @@ func (c *Client) GetPaymentEscrow(ctx context.Context, addr address.Address) (st } func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amount abi.TokenAmount) error { - return c.node.AddFunds(ctx, addr, amount) + done := make(chan error) + + mcid, err := c.node.AddFunds(ctx, addr, amount) + if err != nil { + return err + } + + err = c.node.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { + if code == exitcode.Ok { + done <- nil + } else { + done <- xerrors.Errorf("AddFunds error, exit code: %w", code) + } + return nil + }) + + if err != nil { + return err + } + + return <-done } type clientDealEnvironment struct { diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index 4fd6d479..1860736a 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -12,15 +12,21 @@ import ( // ClientEvents are the events that can happen in a storage client var ClientEvents = fsm.Events{ fsm.Event(storagemarket.ClientEventOpen). - From(storagemarket.StorageDealUnknown).ToNoChange(), + From(storagemarket.StorageDealUnknown).To(storagemarket.StorageDealEnsureClientFunds), + fsm.Event(storagemarket.ClientEventFundingInitiated). + From(storagemarket.StorageDealEnsureClientFunds).To(storagemarket.StorageDealClientFunding). + Action(func(deal *storagemarket.ClientDeal, mcid cid.Cid) error { + deal.AddFundsCid = mcid + return nil + }), fsm.Event(storagemarket.ClientEventEnsureFundsFailed). - From(storagemarket.StorageDealUnknown).To(storagemarket.StorageDealFailing). + From(storagemarket.StorageDealClientFunding).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.ClientDeal, err error) error { deal.Message = xerrors.Errorf("adding market funds failed: %w", err).Error() return nil }), fsm.Event(storagemarket.ClientEventFundsEnsured). - From(storagemarket.StorageDealUnknown).To(storagemarket.StorageDealFundsEnsured), + FromMany(storagemarket.StorageDealEnsureClientFunds, storagemarket.StorageDealClientFunding).To(storagemarket.StorageDealFundsEnsured), fsm.Event(storagemarket.ClientEventWriteProposalFailed). From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealError). Action(func(deal *storagemarket.ClientDeal, err error) error { @@ -97,10 +103,11 @@ var ClientEvents = fsm.Events{ // ClientStateEntryFuncs are the handlers for different states in a storage client var ClientStateEntryFuncs = fsm.StateEntryFuncs{ - storagemarket.StorageDealUnknown: EnsureFunds, - storagemarket.StorageDealFundsEnsured: ProposeDeal, - storagemarket.StorageDealValidating: VerifyDealResponse, - storagemarket.StorageDealProposalAccepted: ValidateDealPublished, - storagemarket.StorageDealSealing: VerifyDealActivated, - storagemarket.StorageDealFailing: FailDeal, + storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds, + storagemarket.StorageDealClientFunding: WaitForFunding, + storagemarket.StorageDealFundsEnsured: ProposeDeal, + storagemarket.StorageDealValidating: VerifyDealResponse, + storagemarket.StorageDealProposalAccepted: ValidateDealPublished, + storagemarket.StorageDealSealing: VerifyDealActivated, + storagemarket.StorageDealFailing: FailDeal, } diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index b1ae0ab0..79b73664 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -2,8 +2,10 @@ package clientstates import ( "github.com/filecoin-project/go-statemachine/fsm" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" @@ -24,19 +26,39 @@ type ClientDealEnvironment interface { // ClientStateEntryFunc is the type for all state entry functions on a storage client type ClientStateEntryFunc func(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error -// EnsureFunds attempts to ensure the client has enough funds for the deal being proposed -func EnsureFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - tok, _, err := environment.Node().GetChainHead(ctx.Context()) +// EnsureClientFunds attempts to ensure the client has enough funds for the deal being proposed +func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { + node := environment.Node() + + tok, _, err := node.GetChainHead(ctx.Context()) if err != nil { - return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err) + return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("acquiring chain head: %w", err)) } - if err := environment.Node().EnsureFunds( - ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement(), tok); err != nil { + mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement(), tok) + + if err != nil { return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err) } - return ctx.Trigger(storagemarket.ClientEventFundsEnsured) + // if no message was sent, and there was no error, funds were already available + if mcid == cid.Undef { + return ctx.Trigger(storagemarket.ClientEventFundsEnsured) + } + + return ctx.Trigger(storagemarket.ClientEventFundingInitiated, mcid) +} + +// WaitForFunding waits for an AddFunds message to appear on the chain +func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { + node := environment.Node() + + return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { + if code == exitcode.Ok { + return ctx.Trigger(storagemarket.ClientEventFundsEnsured) + } + return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %w", code)) + }) } // ProposeDeal sends the deal proposal to the provider diff --git a/storagemarket/impl/clientstates/cliest_states_test.go b/storagemarket/impl/clientstates/cliest_states_test.go index 8656cb63..509815c1 100644 --- a/storagemarket/impl/clientstates/cliest_states_test.go +++ b/storagemarket/impl/clientstates/cliest_states_test.go @@ -26,7 +26,7 @@ func TestEnsureFunds(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureFunds, storagemarket.StorageDealUnknown, clientDealProposal) + runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureClientFunds, storagemarket.StorageDealClientFunding, clientDealProposal) node := func(ensureFundsErr error) storagemarket.StorageClientNode { return &testnodes.FakeClientNode{ @@ -36,16 +36,16 @@ func TestEnsureFunds(t *testing.T) { }, } } - t.Run("EnsureFunds succeeds", func(t *testing.T) { + t.Run("EnsureClientFunds succeeds", func(t *testing.T) { runEnsureFunds(t, node(nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealFundsEnsured) + require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State) }) }) - t.Run("EnsureFunds fails", func(t *testing.T) { + t.Run("EnsureClientFunds fails", func(t *testing.T) { runEnsureFunds(t, node(errors.New("Something went wrong")), nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealFailing) - require.Equal(t, deal.Message, "adding market funds failed: Something went wrong") + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "adding market funds failed: Something went wrong", deal.Message) }) }) } @@ -73,21 +73,21 @@ func TestProposeDeal(t *testing.T) { t.Run("succeeds", func(t *testing.T) { runProposeDeal(t, node(), nil, dealStream(tut.TrivialStorageDealProposalWriter), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealValidating) + require.Equal(t, storagemarket.StorageDealValidating, deal.State) }) }) t.Run("deal stream lookup fails", func(t *testing.T) { runProposeDeal(t, node(), errors.New("deal stream not found"), nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealFailing) - require.Equal(t, deal.Message, "miner connection error: deal stream not found") + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "miner connection error: deal stream not found", deal.Message) }) }) t.Run("write proposal fails fails", func(t *testing.T) { runProposeDeal(t, node(), nil, dealStream(tut.FailStorageProposalWriter), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "sending proposal to storage provider failed: write proposal failed") + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "sending proposal to storage provider failed: write proposal failed", deal.Message) }) }) } @@ -128,23 +128,23 @@ func TestVerifyResponse(t *testing.T) { Signature: tut.MakeTestSignature(), })) runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealProposalAccepted) - require.Equal(t, deal.PublishMessage, publishMessage) + require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State) + require.Equal(t, publishMessage, deal.PublishMessage) }) }) t.Run("deal stream lookup fails", func(t *testing.T) { dealStreamErr := errors.New("deal stream not found") runVerifyResponse(t, node(false), dealStreamErr, dealStream(nil), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealFailing) - require.Equal(t, deal.Message, "miner connection error: deal stream not found") + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "miner connection error: deal stream not found", deal.Message) }) }) t.Run("read response fails", func(t *testing.T) { runVerifyResponse(t, node(false), nil, dealStream(tut.FailStorageResponseReader), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "error reading Response message: read response failed") + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error reading Response message: read response failed", deal.Message) }) }) @@ -159,8 +159,8 @@ func TestVerifyResponse(t *testing.T) { })) failToVerifyNode := node(true) runVerifyResponse(t, failToVerifyNode, nil, stream, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealFailing) - require.Equal(t, deal.Message, "unable to verify signature on deal response") + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "unable to verify signature on deal response", deal.Message) }) }) @@ -174,7 +174,7 @@ func TestVerifyResponse(t *testing.T) { Signature: tut.MakeTestSignature(), })) runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealFailing) + require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Regexp(t, "^miner responded to a wrong proposal:", deal.Message) }) }) @@ -191,8 +191,8 @@ func TestVerifyResponse(t *testing.T) { })) expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected) runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealFailing) - require.Equal(t, expErr, deal.Message) + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, deal.Message, expErr) }) }) @@ -207,8 +207,8 @@ func TestVerifyResponse(t *testing.T) { })) closeStreamErr := errors.New("something went wrong") runVerifyResponse(t, node(false), nil, stream, closeStreamErr, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "error attempting to close stream: something went wrong") + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error attempting to close stream: something went wrong", deal.Message) }) }) @@ -233,15 +233,15 @@ func TestValidateDealPublished(t *testing.T) { t.Run("succeeds", func(t *testing.T) { runValidateDealPublished(t, node(abi.DealID(5), nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealSealing) - require.Equal(t, deal.DealID, abi.DealID(5)) + require.Equal(t, storagemarket.StorageDealSealing, deal.State) + require.Equal(t, abi.DealID(5), deal.DealID) }) }) t.Run("fails", func(t *testing.T) { runValidateDealPublished(t, node(abi.DealID(5), errors.New("Something went wrong")), nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "error validating deal published: Something went wrong") + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error validating deal published: Something went wrong", deal.Message) }) }) } @@ -265,21 +265,21 @@ func TestVerifyDealActivated(t *testing.T) { t.Run("succeeds", func(t *testing.T) { runVerifyDealActivated(t, node(nil, nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealActive) + require.Equal(t, storagemarket.StorageDealActive, deal.State) }) }) t.Run("fails synchronously", func(t *testing.T) { runVerifyDealActivated(t, node(errors.New("Something went wrong"), nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "error in deal activation: Something went wrong") + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error in deal activation: Something went wrong", deal.Message) }) }) t.Run("fails asynchronously", func(t *testing.T) { runVerifyDealActivated(t, node(nil, errors.New("Something went wrong later")), nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "error in deal activation: Something went wrong later") + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error in deal activation: Something went wrong later", deal.Message) }) }) } @@ -293,14 +293,14 @@ func TestFailDeal(t *testing.T) { t.Run("able to close stream", func(t *testing.T) { runFailDeal(t, nil, nil, nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) + require.Equal(t, storagemarket.StorageDealError, deal.State) }) }) t.Run("unable to close stream", func(t *testing.T) { runFailDeal(t, nil, nil, nil, errors.New("unable to close"), func(deal storagemarket.ClientDeal) { - require.Equal(t, deal.State, storagemarket.StorageDealError) - require.Equal(t, deal.Message, "error attempting to close stream: unable to close") + require.Equal(t, storagemarket.StorageDealError, deal.State) + require.Equal(t, "error attempting to close stream: unable to close", deal.Message) }) }) } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 88f93cd7..975f3844 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -9,6 +9,7 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -247,7 +248,27 @@ func (p *Provider) ListDeals(ctx context.Context) ([]storagemarket.StorageDeal, } func (p *Provider) AddStorageCollateral(ctx context.Context, amount abi.TokenAmount) error { - return p.spn.AddFunds(ctx, p.actor, amount) + done := make(chan error) + + mcid, err := p.spn.AddFunds(ctx, p.actor, amount) + if err != nil { + return err + } + + err = p.spn.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { + if code == exitcode.Ok { + done <- nil + } else { + done <- xerrors.Errorf("AddFunds error, exit code: %w", code) + } + return nil + }) + + if err != nil { + return err + } + + return <-done } func (p *Provider) GetStorageCollateral(ctx context.Context) (storagemarket.Balance, error) { diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index e908ba60..73d84627 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -3,6 +3,7 @@ package providerstates import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/filestore" @@ -44,12 +45,20 @@ var ProviderEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ProviderEventVerifiedData). - FromMany(storagemarket.StorageDealVerifyData, storagemarket.StorageDealWaitingForData).To(storagemarket.StorageDealPublishing). + FromMany(storagemarket.StorageDealVerifyData, storagemarket.StorageDealWaitingForData).To(storagemarket.StorageDealEnsureProviderFunds). Action(func(deal *storagemarket.MinerDeal, path filestore.Path, metadataPath filestore.Path) error { deal.PiecePath = path deal.MetadataPath = metadataPath return nil }), + fsm.Event(storagemarket.ProviderEventFundingInitiated). + From(storagemarket.StorageDealEnsureProviderFunds).To(storagemarket.StorageDealProviderFunding). + Action(func(deal *storagemarket.MinerDeal, mcid cid.Cid) error { + deal.AddFundsCid = mcid + return nil + }), + fsm.Event(storagemarket.ProviderEventFunded). + FromMany(storagemarket.StorageDealProviderFunding, storagemarket.StorageDealEnsureProviderFunds).To(storagemarket.StorageDealPublishing), fsm.Event(storagemarket.ProviderEventSendResponseFailed). FromMany(storagemarket.StorageDealPublishing, storagemarket.StorageDealFailing).To(storagemarket.StorageDealError). Action(func(deal *storagemarket.MinerDeal, err error) error { @@ -103,12 +112,14 @@ var ProviderEvents = fsm.Events{ // ProviderStateEntryFuncs are the handlers for different states in a storage client var ProviderStateEntryFuncs = fsm.StateEntryFuncs{ - storagemarket.StorageDealValidating: ValidateDealProposal, - storagemarket.StorageDealProposalAccepted: TransferData, - storagemarket.StorageDealVerifyData: VerifyData, - storagemarket.StorageDealPublishing: PublishDeal, - storagemarket.StorageDealStaged: HandoffDeal, - storagemarket.StorageDealSealing: VerifyDealActivated, - storagemarket.StorageDealActive: RecordPieceInfo, - storagemarket.StorageDealFailing: FailDeal, + storagemarket.StorageDealValidating: ValidateDealProposal, + storagemarket.StorageDealProposalAccepted: TransferData, + storagemarket.StorageDealVerifyData: VerifyData, + storagemarket.StorageDealEnsureProviderFunds: EnsureProviderFunds, + storagemarket.StorageDealProviderFunding: WaitForFunding, + storagemarket.StorageDealPublishing: PublishDeal, + storagemarket.StorageDealStaged: HandoffDeal, + storagemarket.StorageDealSealing: VerifyDealActivated, + storagemarket.StorageDealActive: RecordPieceInfo, + storagemarket.StorageDealFailing: FailDeal, } diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index 65e6e8b0..d4d3d15e 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" @@ -159,23 +160,46 @@ func VerifyData(ctx fsm.Context, environment ProviderDealEnvironment, deal stora return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath) } -// PublishDeal publishes a deal on chain and sends the deal id back to the client -func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - tok, _, err := environment.Node().GetChainHead(ctx.Context()) +func EnsureProviderFunds(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { + node := environment.Node() + + tok, _, err := node.GetChainHead(ctx.Context()) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("acquiring chain head: %w", err)) } - waddr, err := environment.Node().GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok) + waddr, err := node.GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err)) } - // TODO: check StorageCollateral (may be too large (or too small)) - if err := environment.Node().EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok); err != nil { + mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok) + + if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err)) } + // if no message was sent, and there was no error, it was instantaneous + if mcid == cid.Undef { + return ctx.Trigger(storagemarket.ProviderEventFunded) + } + + return ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid) +} + +func WaitForFunding(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { + node := environment.Node() + + return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { + if code == exitcode.Ok { + return ctx.Trigger(storagemarket.ProviderEventFunded) + } + return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %w", code)) + }) +} + +// PublishDeal publishes a deal on chain and sends the deal id back to the client +func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { smDeal := storagemarket.MinerDeal{ Client: deal.Client, ClientDealProposal: deal.ClientDealProposal, diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 0340318f..d979237a 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -204,7 +204,7 @@ func TestVerifyData(t *testing.T) { MetadataPath: expMetaPath, }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealPublishing, deal.State) + require.Equal(t, storagemarket.StorageDealEnsureProviderFunds, deal.State) require.Equal(t, expPath, deal.PiecePath) require.Equal(t, expMetaPath, deal.MetadataPath) }, @@ -235,12 +235,11 @@ func TestVerifyData(t *testing.T) { } } -func TestPublishDeal(t *testing.T) { +func TestEnsureProviderFunds(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) require.NoError(t, err) - runPublishDeal := makeExecutor(ctx, eventProcessor, providerstates.PublishDeal, storagemarket.StorageDealPublishing) - expDealID := abi.DealID(rand.Uint64()) + runEnsureProviderFunds := makeExecutor(ctx, eventProcessor, providerstates.EnsureProviderFunds, storagemarket.StorageDealEnsureProviderFunds) tests := map[string]struct { nodeParams nodeParams dealParams dealParams @@ -250,13 +249,8 @@ func TestPublishDeal(t *testing.T) { dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ "succeeds": { - nodeParams: nodeParams{ - PublishDealID: expDealID, - }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealStaged, deal.State) - require.Equal(t, expDealID, deal.DealID) - require.Equal(t, true, deal.ConnectionClosed) + require.Equal(t, storagemarket.StorageDealPublishing, deal.State) }, }, "get miner worker fails": { @@ -277,6 +271,38 @@ func TestPublishDeal(t *testing.T) { require.Equal(t, "error calling node: ensuring funds: not enough funds", deal.Message) }, }, + } + for test, data := range tests { + t.Run(test, func(t *testing.T) { + runEnsureProviderFunds(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) + }) + } +} + +func TestPublishDeal(t *testing.T) { + ctx := context.Background() + eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) + require.NoError(t, err) + runPublishDeal := makeExecutor(ctx, eventProcessor, providerstates.PublishDeal, storagemarket.StorageDealPublishing) + expDealID := abi.DealID(rand.Uint64()) + tests := map[string]struct { + nodeParams nodeParams + dealParams dealParams + environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams + dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + }{ + "succeeds": { + nodeParams: nodeParams{ + PublishDealID: expDealID, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, storagemarket.StorageDealStaged, deal.State) + require.Equal(t, expDealID, deal.DealID) + require.Equal(t, true, deal.ConnectionClosed) + }, + }, "PublishDealsErrors errors": { nodeParams: nodeParams{ PublishDealsError: errors.New("could not post to chain"), diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index dfeeca36..5e118bde 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -35,53 +35,9 @@ import ( func TestMakeDeal(t *testing.T) { ctx := context.Background() - epoch := abi.ChainEpoch(100) - nodeCommon := testnodes.FakeCommonNode{SMState: testnodes.NewStorageMarketState()} - td := shared_testutil.NewLibp2pTestData(ctx, t) - rootLink := td.LoadUnixFSFile(t, "payload.txt", false) - payloadCid := rootLink.(cidlink.Link).Cid - - clientNode := testnodes.FakeClientNode{ - FakeCommonNode: nodeCommon, - ClientAddr: address.TestAddress, - } - - providerAddr := address.TestAddress2 - tempPath, err := ioutil.TempDir("", "storagemarket_test") - assert.NoError(t, err) - ps := piecestore.NewPieceStore(td.Ds2) - providerNode := testnodes.FakeProviderNode{ - FakeCommonNode: nodeCommon, - MinerAddr: providerAddr, - } - fs, err := filestore.NewLocalFileStore(filestore.OsPath(tempPath)) - assert.NoError(t, err) - - // create provider and client - dt1 := graphsync.NewGraphSyncDataTransfer(td.Host1, td.GraphSync1) - require.NoError(t, dt1.RegisterVoucherType(reflect.TypeOf(&requestvalidation.StorageDataTransferVoucher{}), &fakeDTValidator{})) - - client, err := storageimpl.NewClient( - network.NewFromLibp2pHost(td.Host1), - td.Bs1, - dt1, - discovery.NewLocal(td.Ds1), - td.Ds1, - &clientNode, - ) - require.NoError(t, err) - dt2 := graphsync.NewGraphSyncDataTransfer(td.Host2, td.GraphSync2) - provider, err := storageimpl.NewProvider( - network.NewFromLibp2pHost(td.Host2), - td.Ds2, - td.Bs2, - fs, - ps, - dt2, - &providerNode, - providerAddr, - abi.RegisteredProof_StackedDRG2KiBPoSt, - ) + h := newHarness(t, ctx) + h.Client.Run(ctx) + err := h.Provider.Start(ctx) assert.NoError(t, err) // set up a subscriber @@ -89,36 +45,10 @@ func TestMakeDeal(t *testing.T) { subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { dealChan <- deal } - _ = provider.SubscribeToEvents(subscriber) - - // set ask price where we'll accept any price - err = provider.AddAsk(big.NewInt(0), 50_000) - assert.NoError(t, err) - - err = provider.Start(ctx) - assert.NoError(t, err) - - // Closely follows the MinerInfo struct in the spec - providerInfo := storagemarket.StorageProviderInfo{ - Address: providerAddr, - Owner: providerAddr, - Worker: providerAddr, - SectorSize: 1 << 20, - PeerID: td.Host2.ID(), - } + _ = h.Provider.SubscribeToEvents(subscriber) - var proposalCid cid.Cid - - // make a deal - client.Run(ctx) - dataRef := &storagemarket.DataRef{ - TransferType: storagemarket.TTGraphsync, - Root: payloadCid, - } - result, err := client.ProposeStorageDeal(ctx, providerAddr, &providerInfo, dataRef, abi.ChainEpoch(epoch+100), abi.ChainEpoch(epoch+20100), big.NewInt(1), big.NewInt(0), abi.RegisteredProof_StackedDRG2KiBPoSt) - assert.NoError(t, err) - - proposalCid = result.ProposalCid + result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}) + proposalCid := result.ProposalCid time.Sleep(time.Millisecond * 100) @@ -140,6 +70,7 @@ func TestMakeDeal(t *testing.T) { storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealTransferring, storagemarket.StorageDealVerifyData, + storagemarket.StorageDealEnsureProviderFunds, storagemarket.StorageDealPublishing, storagemarket.StorageDealStaged, storagemarket.StorageDealSealing, @@ -149,33 +80,130 @@ func TestMakeDeal(t *testing.T) { assert.Equal(t, expectedStates, actualStates) // check a couple of things to make sure we're getting the whole deal - assert.Equal(t, td.Host1.ID(), seenDeal.Client) + assert.Equal(t, h.TestData.Host1.ID(), seenDeal.Client) assert.Empty(t, seenDeal.Message) assert.Equal(t, proposalCid, seenDeal.ProposalCid) - assert.Equal(t, providerAddr, seenDeal.ClientDealProposal.Proposal.Provider) + assert.Equal(t, h.ProviderAddr, seenDeal.ClientDealProposal.Proposal.Provider) - cd, err := client.GetLocalDeal(ctx, proposalCid) + cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, int(storagemarket.StorageDealActive), int(cd.State)) + assert.Equal(t, storagemarket.StorageDealActive, cd.State) - providerDeals, err := provider.ListLocalDeals() + providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) pd := providerDeals[0] - assert.True(t, pd.ProposalCid.Equals(proposalCid)) - assert.Equal(t, int(storagemarket.StorageDealCompleted), int(pd.State)) + assert.Equal(t, pd.ProposalCid, proposalCid) + assert.Equal(t, storagemarket.StorageDealCompleted, pd.State) } func TestMakeDealOffline(t *testing.T) { ctx := context.Background() + h := newHarness(t, ctx) + h.Client.Run(ctx) + + carBuf := new(bytes.Buffer) + + err := cario.NewCarIO().WriteCar(ctx, h.TestData.Bs1, h.PayloadCid, h.TestData.AllSelector, carBuf) + require.NoError(t, err) + + commP, size, err := pieceio.GeneratePieceCommitment(abi.RegisteredProof_StackedDRG2KiBPoSt, carBuf, uint64(carBuf.Len())) + assert.NoError(t, err) + + dataRef := &storagemarket.DataRef{ + TransferType: storagemarket.TTManual, + Root: h.PayloadCid, + PieceCid: &commP, + PieceSize: size, + } + + result := h.ProposeStorageDeal(t, dataRef) + proposalCid := result.ProposalCid + + time.Sleep(time.Millisecond * 100) + + cd, err := h.Client.GetLocalDeal(ctx, proposalCid) + assert.NoError(t, err) + assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + + providerDeals, err := h.Provider.ListLocalDeals() + assert.NoError(t, err) + + pd := providerDeals[0] + assert.True(t, pd.ProposalCid.Equals(proposalCid)) + assert.Equal(t, storagemarket.StorageDealWaitingForData, pd.State) + + err = cario.NewCarIO().WriteCar(ctx, h.TestData.Bs1, h.PayloadCid, h.TestData.AllSelector, carBuf) + require.NoError(t, err) + err = h.Provider.ImportDataForDeal(ctx, pd.ProposalCid, carBuf) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 100) + + cd, err = h.Client.GetLocalDeal(ctx, proposalCid) + assert.NoError(t, err) + assert.Equal(t, storagemarket.StorageDealActive, cd.State) + + providerDeals, err = h.Provider.ListLocalDeals() + assert.NoError(t, err) + + pd = providerDeals[0] + assert.True(t, pd.ProposalCid.Equals(proposalCid)) + assert.Equal(t, storagemarket.StorageDealCompleted, pd.State) +} + +func TestMakeDealNonBlocking(t *testing.T) { + ctx := context.Background() + h := newHarness(t, ctx) + testCids := shared_testutil.GenerateCids(2) + + h.ClientNode.AddFundsCid = testCids[0] + h.Client.Run(ctx) + + h.ProviderNode.BlockOnMessageWait = true + h.ProviderNode.AddFundsCid = testCids[1] + err := h.Provider.Start(ctx) + assert.NoError(t, err) + + result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}) + + time.Sleep(time.Millisecond * 500) + + cd, err := h.Client.GetLocalDeal(ctx, result.ProposalCid) + assert.NoError(t, err) + assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + + providerDeals, err := h.Provider.ListLocalDeals() + assert.NoError(t, err) + + // Provider should be blocking on waiting for funds to appear on chain + pd := providerDeals[0] + assert.Equal(t, result.ProposalCid, pd.ProposalCid) + assert.Equal(t, storagemarket.StorageDealProviderFunding, pd.State) +} + +type harness struct { + Ctx context.Context + Epoch abi.ChainEpoch + PayloadCid cid.Cid + ProviderAddr address.Address + Client storagemarket.StorageClient + ClientNode *testnodes.FakeClientNode + Provider storagemarket.StorageProvider + ProviderNode *testnodes.FakeProviderNode + ProviderInfo storagemarket.StorageProviderInfo + TestData *shared_testutil.Libp2pTestData +} + +func newHarness(t *testing.T, ctx context.Context) *harness { epoch := abi.ChainEpoch(100) - nodeCommon := testnodes.FakeCommonNode{SMState: testnodes.NewStorageMarketState()} td := shared_testutil.NewLibp2pTestData(ctx, t) rootLink := td.LoadUnixFSFile(t, "payload.txt", false) payloadCid := rootLink.(cidlink.Link).Cid + smState := testnodes.NewStorageMarketState() clientNode := testnodes.FakeClientNode{ - FakeCommonNode: nodeCommon, + FakeCommonNode: testnodes.FakeCommonNode{SMState: smState}, ClientAddr: address.TestAddress, } @@ -183,8 +211,8 @@ func TestMakeDealOffline(t *testing.T) { tempPath, err := ioutil.TempDir("", "storagemarket_test") assert.NoError(t, err) ps := piecestore.NewPieceStore(td.Ds2) - providerNode := testnodes.FakeProviderNode{ - FakeCommonNode: nodeCommon, + providerNode := &testnodes.FakeProviderNode{ + FakeCommonNode: testnodes.FakeCommonNode{SMState: smState, BlockOnMessageWait: true}, MinerAddr: providerAddr, } fs, err := filestore.NewLocalFileStore(filestore.OsPath(tempPath)) @@ -206,12 +234,12 @@ func TestMakeDealOffline(t *testing.T) { dt2 := graphsync.NewGraphSyncDataTransfer(td.Host2, td.GraphSync2) provider, err := storageimpl.NewProvider( network.NewFromLibp2pHost(td.Host2), - td.Ds1, + td.Ds2, td.Bs2, fs, ps, dt2, - &providerNode, + providerNode, providerAddr, abi.RegisteredProof_StackedDRG2KiBPoSt, ) @@ -233,62 +261,34 @@ func TestMakeDealOffline(t *testing.T) { PeerID: td.Host2.ID(), } - var proposalCid cid.Cid - - // make a deal - client.Run(ctx) - dataRef := &storagemarket.DataRef{ - TransferType: storagemarket.TTManual, - Root: payloadCid, + return &harness{ + Ctx: ctx, + Epoch: epoch, + PayloadCid: payloadCid, + ProviderAddr: providerAddr, + Client: client, + ClientNode: &clientNode, + Provider: provider, + ProviderNode: providerNode, + ProviderInfo: providerInfo, + TestData: td, } +} - carBuf := new(bytes.Buffer) - - err = cario.NewCarIO().WriteCar(ctx, td.Bs1, payloadCid, td.AllSelector, carBuf) - require.NoError(t, err) - - commP, size, err := pieceio.GeneratePieceCommitment(abi.RegisteredProof_StackedDRG2KiBPoSt, carBuf, uint64(carBuf.Len())) - - assert.NoError(t, err) - - dataRef.PieceCid = &commP - dataRef.PieceSize = size - - result, err := client.ProposeStorageDeal(ctx, providerAddr, &providerInfo, dataRef, abi.ChainEpoch(epoch+100), abi.ChainEpoch(epoch+20100), big.NewInt(1), big.NewInt(0), abi.RegisteredProof_StackedDRG2KiBPoSt) - assert.NoError(t, err) - - proposalCid = result.ProposalCid - - time.Sleep(time.Millisecond * 100) - - cd, err := client.GetLocalDeal(ctx, proposalCid) - assert.NoError(t, err) - assert.Equal(t, cd.State, storagemarket.StorageDealValidating) - - providerDeals, err := provider.ListLocalDeals() - assert.NoError(t, err) - - pd := providerDeals[0] - assert.True(t, pd.ProposalCid.Equals(proposalCid)) - assert.Equal(t, pd.State, storagemarket.StorageDealWaitingForData) - - err = cario.NewCarIO().WriteCar(ctx, td.Bs1, payloadCid, td.AllSelector, carBuf) - require.NoError(t, err) - err = provider.ImportDataForDeal(ctx, pd.ProposalCid, carBuf) - require.NoError(t, err) - - time.Sleep(time.Millisecond * 100) - - cd, err = client.GetLocalDeal(ctx, proposalCid) - assert.NoError(t, err) - assert.Equal(t, cd.State, storagemarket.StorageDealActive) - - providerDeals, err = provider.ListLocalDeals() +func (h *harness) ProposeStorageDeal(t *testing.T, dataRef *storagemarket.DataRef) *storagemarket.ProposeStorageDealResult { + result, err := h.Client.ProposeStorageDeal( + h.Ctx, + h.ProviderAddr, + &h.ProviderInfo, + dataRef, + h.Epoch+100, + h.Epoch+20100, + big.NewInt(1), + big.NewInt(0), + abi.RegisteredProof_StackedDRG2KiBPoSt, + ) assert.NoError(t, err) - - pd = providerDeals[0] - assert.True(t, pd.ProposalCid.Equals(proposalCid)) - assert.Equal(t, pd.State, storagemarket.StorageDealCompleted) + return result } type fakeDTValidator struct{} diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index 465fe5d6..2f0bdcb8 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -3,12 +3,14 @@ package testnodes import ( "context" "io" + "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" "github.com/filecoin-project/go-fil-markets/shared" @@ -86,10 +88,13 @@ func (sma *StorageMarketState) AddDeal(deal storagemarket.StorageDeal) (shared.T // FakeCommonNode has the common methods for the storage & client node adapters type FakeCommonNode struct { SMState *StorageMarketState + BlockOnMessageWait bool + AddFundsCid cid.Cid EnsureFundsError error VerifySignatureFails bool GetBalanceError error GetChainHeadError error + WaitForMessageExit exitcode.ExitCode } // GetChainHead returns the state id in the storage market state @@ -103,20 +108,28 @@ func (n *FakeCommonNode) GetChainHead(ctx context.Context) (shared.TipSetToken, } // AddFunds adds funds to the given actor in the storage market state -func (n *FakeCommonNode) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) error { +func (n *FakeCommonNode) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) (cid.Cid, error) { n.SMState.AddFunds(addr, amount) - return nil + return n.AddFundsCid, nil } // EnsureFunds adds funds to the given actor in the storage market state to insure it has at least the given amount -func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, tok shared.TipSetToken) error { +func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, tok shared.TipSetToken) (cid.Cid, error) { if n.EnsureFundsError == nil { balance := n.SMState.Balance(addr) if balance.Available.LessThan(amount) { - n.SMState.AddFunds(addr, big.Sub(amount, balance.Available)) + return n.AddFunds(ctx, addr, big.Sub(amount, balance.Available)) } } - return n.EnsureFundsError + + return cid.Undef, n.EnsureFundsError +} + +func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error { + if n.BlockOnMessageWait { + time.Sleep(5 * time.Second) + } + return onCompletion(n.WaitForMessageExit, nil) } // GetBalance returns the funds in the storage market state diff --git a/storagemarket/types.go b/storagemarket/types.go index 94f176f3..d9eb06fe 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" "github.com/libp2p/go-libp2p-core/peer" @@ -20,6 +21,7 @@ import ( const DealProtocolID = "/fil/storage/mk/1.0.1" const AskProtocolID = "/fil/storage/ask/1.0.1" +const ChainConfidence = 10 type Balance struct { Locked abi.TokenAmount @@ -41,14 +43,18 @@ const ( // Internal - StorageDealFundsEnsured // Deposited funds as neccesary to create a deal, ready to move forward - StorageDealValidating // Verifying that deal parameters are good - StorageDealTransferring // Moving data - StorageDealWaitingForData // Manual transfer - StorageDealVerifyData // Verify transferred data - generate CAR / piece data - StorageDealPublishing // Publishing deal to chain - StorageDealError // deal failed with an unexpected error - StorageDealCompleted // on provider side, indicates deal is active and info for retrieval is recorded + StorageDealFundsEnsured // Deposited funds as neccesary to create a deal, ready to move forward + StorageDealValidating // Verifying that deal parameters are good + StorageDealTransferring // Moving data + StorageDealWaitingForData // Manual transfer + StorageDealVerifyData // Verify transferred data - generate CAR / piece data + StorageDealEnsureProviderFunds // Ensuring that provider collateral is sufficient + StorageDealEnsureClientFunds // Ensuring that client funds are sufficient + StorageDealProviderFunding // Waiting for funds to appear in Provider balance + StorageDealClientFunding // Waiting for funds to appear in Client balance + StorageDealPublishing // Publishing deal to chain + StorageDealError // deal failed with an unexpected error + StorageDealCompleted // on provider side, indicates deal is active and info for retrieval is recorded ) var DealStates = []string{ @@ -67,6 +73,10 @@ var DealStates = []string{ "StorageDealTransferring", "StorageDealWaitingForData", "StorageDealVerifyData", + "StorageDealEnsureProviderFunds", + "StorageDealEnsureClientFunds", + "StorageDealProviderFunding", + "StorageDealClientFunding", "StorageDealPublishing", "StorageDealError", "StorageDealCompleted", @@ -116,6 +126,7 @@ var StorageAskUndefined = StorageAsk{} type MinerDeal struct { market.ClientDealProposal ProposalCid cid.Cid + AddFundsCid cid.Cid Miner peer.ID Client peer.ID State StorageDealStatus @@ -148,6 +159,15 @@ const ( // meaning the provider must wait until it receives data manually ProviderEventWaitingForManualData + // ProviderEventInsufficientFunds indicates not enough funds available for a deal + ProviderEventInsufficientFunds + + // ProviderEventFundingInitiated indicates provider collateral funding has been initiated + ProviderEventFundingInitiated + + // ProviderEventFunded indicates provider collateral has appeared in the storage market balance + ProviderEventFunded + // ProviderEventDataTransferFailed happens when an error occurs transferring data ProviderEventDataTransferFailed @@ -207,6 +227,7 @@ const ( type ClientDeal struct { market.ClientDealProposal ProposalCid cid.Cid + AddFundsCid cid.Cid State StorageDealStatus Miner peer.ID MinerWorker address.Address @@ -225,6 +246,9 @@ const ( // ClientEventEnsureFundsFailed happens when attempting to ensure the client has enough funds available fails ClientEventEnsureFundsFailed + // ClientEventFundsInitiated happens when a client has sent a message adding funds to its balance + ClientEventFundingInitiated + // ClientEventFundsEnsured happens when a client successfully ensures it has funds for a deal ClientEventFundsEnsured @@ -277,6 +301,11 @@ type StorageDeal struct { market.DealState } +type DealSectorCommittedCallback func(err error) +type FundsAddedCallback func(err error) +type DealsPublishedCallback func(err error) +type MessagePublishedCallback func(mcid cid.Cid, err error) + // Subscriber is a callback that is called when events are emitted type ProviderSubscriber func(event ProviderEvent, deal MinerDeal) @@ -308,23 +337,30 @@ type StorageProvider interface { SubscribeToEvents(subscriber ProviderSubscriber) shared.Unsubscribe } -// Node dependencies for a StorageProvider -type StorageProviderNode interface { - GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) - - // Verify a signature against an address + data - VerifySignature(ctx context.Context, signature crypto.Signature, signer address.Address, plaintext []byte, tok shared.TipSetToken) (bool, error) - +type StorageFunds interface { // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. - AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) error + AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) (cid.Cid, error) // Ensures that a storage market participant has a certain amount of available funds // If additional funds are needed, they will be sent from the 'wallet' address - EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, tok shared.TipSetToken) error + // callback is immediately called if sufficient funds are available + EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, tok shared.TipSetToken) (cid.Cid, error) // GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients. GetBalance(ctx context.Context, addr address.Address, tok shared.TipSetToken) (Balance, error) + // Verify a signature against an address + data + VerifySignature(ctx context.Context, signature crypto.Signature, signer address.Address, plaintext []byte, tok shared.TipSetToken) (bool, error) + + WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error +} + +// Node dependencies for a StorageProvider +type StorageProviderNode interface { + StorageFunds + + GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) + // Publishes deal on chain PublishDeals(ctx context.Context, deal MinerDeal) (abi.DealID, cid.Cid, error) @@ -345,22 +381,11 @@ type StorageProviderNode interface { LocatePieceForDealWithinSector(ctx context.Context, dealID abi.DealID, tok shared.TipSetToken) (sectorID uint64, offset uint64, length uint64, err error) } -type DealSectorCommittedCallback func(err error) - // Node dependencies for a StorageClient type StorageClientNode interface { - GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) + StorageFunds - // Verify a signature against an address + data - VerifySignature(ctx context.Context, signature crypto.Signature, signer address.Address, plaintext []byte, tok shared.TipSetToken) (bool, error) - - // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. - AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) error - - EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, tok shared.TipSetToken) error - - // GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients. - GetBalance(ctx context.Context, addr address.Address, tok shared.TipSetToken) (Balance, error) + GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) // ListClientDeals lists all on-chain deals associated with a storage client ListClientDeals(ctx context.Context, addr address.Address, tok shared.TipSetToken) ([]StorageDeal, error) From 7e159cdcb70cea03b28f8f45ac17770552657918 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Fri, 24 Apr 2020 10:55:56 -0700 Subject: [PATCH 2/4] Publishing deals doesn't block --- ...t_states_test.go => client_states_test.go} | 0 .../impl/providerstates/provider_fsm.go | 17 +++- .../impl/providerstates/provider_states.go | 51 ++++++++---- .../providerstates/provider_states_test.go | 82 +++++++++++++++---- storagemarket/integration_test.go | 19 ++++- storagemarket/testnodes/testnodes.go | 20 +++-- storagemarket/types.go | 12 ++- 7 files changed, 157 insertions(+), 44 deletions(-) rename storagemarket/impl/clientstates/{cliest_states_test.go => client_states_test.go} (100%) diff --git a/storagemarket/impl/clientstates/cliest_states_test.go b/storagemarket/impl/clientstates/client_states_test.go similarity index 100% rename from storagemarket/impl/clientstates/cliest_states_test.go rename to storagemarket/impl/clientstates/client_states_test.go diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index 73d84627..a8b62995 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -58,7 +58,19 @@ var ProviderEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ProviderEventFunded). - FromMany(storagemarket.StorageDealProviderFunding, storagemarket.StorageDealEnsureProviderFunds).To(storagemarket.StorageDealPublishing), + FromMany(storagemarket.StorageDealProviderFunding, storagemarket.StorageDealEnsureProviderFunds).To(storagemarket.StorageDealPublish), + fsm.Event(storagemarket.ProviderEventDealPublishInitiated). + From(storagemarket.StorageDealPublish).To(storagemarket.StorageDealPublishing). + Action(func(deal *storagemarket.MinerDeal, publishCid cid.Cid) error { + deal.PublishCid = publishCid + return nil + }), + fsm.Event(storagemarket.ProviderEventDealPublishError). + From(storagemarket.StorageDealPublishing).To(storagemarket.StorageDealFailing). + Action(func(deal *storagemarket.MinerDeal, err error) error { + deal.Message = xerrors.Errorf("PublishStorageDeal error: %w", err).Error() + return nil + }), fsm.Event(storagemarket.ProviderEventSendResponseFailed). FromMany(storagemarket.StorageDealPublishing, storagemarket.StorageDealFailing).To(storagemarket.StorageDealError). Action(func(deal *storagemarket.MinerDeal, err error) error { @@ -117,7 +129,8 @@ var ProviderStateEntryFuncs = fsm.StateEntryFuncs{ storagemarket.StorageDealVerifyData: VerifyData, storagemarket.StorageDealEnsureProviderFunds: EnsureProviderFunds, storagemarket.StorageDealProviderFunding: WaitForFunding, - storagemarket.StorageDealPublishing: PublishDeal, + storagemarket.StorageDealPublish: PublishDeal, + storagemarket.StorageDealPublishing: WaitForPublish, storagemarket.StorageDealStaged: HandoffDeal, storagemarket.StorageDealSealing: VerifyDealActivated, storagemarket.StorageDealActive: RecordPieceInfo, diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index d4d3d15e..d864d348 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -1,6 +1,7 @@ package providerstates import ( + "bytes" "context" "github.com/filecoin-project/go-address" @@ -9,6 +10,7 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -198,7 +200,7 @@ func WaitForFunding(ctx fsm.Context, environment ProviderDealEnvironment, deal s }) } -// PublishDeal publishes a deal on chain and sends the deal id back to the client +// PublishDeal sends a message to publish a deal on chain func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { smDeal := storagemarket.MinerDeal{ Client: deal.Client, @@ -208,27 +210,44 @@ func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor Ref: deal.Ref, } - dealID, mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal) + // TODO: PublishDeals does not return the deal id, change API + _, mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err)) } - err = environment.SendSignedResponse(ctx.Context(), &network.Response{ - State: storagemarket.StorageDealProposalAccepted, - - Proposal: deal.ProposalCid, - PublishMessage: &mcid, - }) - - if err != nil { - return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) - } + return ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid) +} - if err := environment.Disconnect(deal.ProposalCid); err != nil { - log.Warnf("closing client connection: %+v", err) - } +// WaitForPublish waits for the publish message on chain and sends the deal id back to the client +func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { + return environment.Node().WaitForMessage(deal.PublishCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, retBytes []byte) error { + if code == exitcode.Ok { + var retval market.PublishStorageDealsReturn + err := retval.UnmarshalCBOR(bytes.NewReader(retBytes)) + if err != nil { + return err + } + + err = environment.SendSignedResponse(ctx.Context(), &network.Response{ + State: storagemarket.StorageDealProposalAccepted, + Proposal: deal.ProposalCid, + PublishMessage: &deal.PublishCid, + }) + + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) + } + + if err := environment.Disconnect(deal.ProposalCid); err != nil { + log.Warnf("closing client connection: %+v", err) + } + + return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) + } - return ctx.Trigger(storagemarket.ProviderEventDealPublished, dealID) + return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %w", code)) + }) } // HandoffDeal hands off a published deal for sealing and commitment in a sector diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index d979237a..1bfccf0c 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -14,7 +14,9 @@ import ( fsmtest "github.com/filecoin-project/go-statemachine/fsm/testutil" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" + "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" @@ -250,7 +252,7 @@ func TestEnsureProviderFunds(t *testing.T) { }{ "succeeds": { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { - require.Equal(t, storagemarket.StorageDealPublishing, deal.State) + require.Equal(t, storagemarket.StorageDealPublish, deal.State) }, }, "get miner worker fails": { @@ -283,8 +285,51 @@ func TestPublishDeal(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) require.NoError(t, err) - runPublishDeal := makeExecutor(ctx, eventProcessor, providerstates.PublishDeal, storagemarket.StorageDealPublishing) + runPublishDeal := makeExecutor(ctx, eventProcessor, providerstates.PublishDeal, storagemarket.StorageDealPublish) + tests := map[string]struct { + nodeParams nodeParams + dealParams dealParams + environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams + dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + }{ + "succeeds": { + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, storagemarket.StorageDealPublishing, deal.State) + }, + }, + "PublishDealsErrors errors": { + nodeParams: nodeParams{ + PublishDealsError: errors.New("could not post to chain"), + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "error calling node: publishing deal: could not post to chain", deal.Message) + }, + }, + } + for test, data := range tests { + t.Run(test, func(t *testing.T) { + runPublishDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) + }) + } +} + +func TestWaitForPublish(t *testing.T) { + log.SetDebugLogging() + + ctx := context.Background() + eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) + require.NoError(t, err) + runWaitForPublish := makeExecutor(ctx, eventProcessor, providerstates.WaitForPublish, storagemarket.StorageDealPublishing) expDealID := abi.DealID(rand.Uint64()) + + psdReturn := market.PublishStorageDealsReturn{IDs: []abi.DealID{expDealID}} + psdReturnBytes := bytes.NewBuffer([]byte{}) + err = psdReturn.MarshalCBOR(psdReturnBytes) + require.NoError(t, err) + tests := map[string]struct { nodeParams nodeParams dealParams dealParams @@ -295,7 +340,7 @@ func TestPublishDeal(t *testing.T) { }{ "succeeds": { nodeParams: nodeParams{ - PublishDealID: expDealID, + WaitForMessageRetBytes: psdReturnBytes.Bytes(), }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, storagemarket.StorageDealStaged, deal.State) @@ -303,16 +348,19 @@ func TestPublishDeal(t *testing.T) { require.Equal(t, true, deal.ConnectionClosed) }, }, - "PublishDealsErrors errors": { + "PublishStorageDeal errors": { nodeParams: nodeParams{ - PublishDealsError: errors.New("could not post to chain"), + WaitForMessageExitCode: exitcode.SysErrForbidden, }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "error calling node: publishing deal: could not post to chain", deal.Message) + require.Equal(t, "PublishStorageDeal error: PublishStorageDeals exit code: SysErrForbidden(8)", deal.Message) }, }, "SendSignedResponse errors": { + nodeParams: nodeParams{ + WaitForMessageRetBytes: psdReturnBytes.Bytes(), + }, environmentParams: environmentParams{ SendSignedResponseError: errors.New("could not send"), }, @@ -324,7 +372,7 @@ func TestPublishDeal(t *testing.T) { } for test, data := range tests { t.Run(test, func(t *testing.T) { - runPublishDeal(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) + runWaitForPublish(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) }) } } @@ -634,12 +682,15 @@ type nodeParams struct { MostRecentStateIDError error PieceLength uint64 PieceSectorID uint64 - PublishDealID abi.DealID PublishDealsError error OnDealCompleteError error LocatePieceForDealWithinSectorError error DealCommittedSyncError error DealCommittedAsyncError error + WaitForMessageBlocks bool + WaitForMessageError error + WaitForMessageExitCode exitcode.ExitCode + WaitForMessageRetBytes []byte } type dealParams struct { @@ -702,11 +753,15 @@ func makeExecutor(ctx context.Context, } common := testnodes.FakeCommonNode{ - SMState: smstate, - GetChainHeadError: nodeParams.MostRecentStateIDError, - GetBalanceError: nodeParams.ClientMarketBalanceError, - VerifySignatureFails: nodeParams.VerifySignatureFails, - EnsureFundsError: nodeParams.EnsureFundsError, + SMState: smstate, + GetChainHeadError: nodeParams.MostRecentStateIDError, + GetBalanceError: nodeParams.ClientMarketBalanceError, + VerifySignatureFails: nodeParams.VerifySignatureFails, + EnsureFundsError: nodeParams.EnsureFundsError, + WaitForMessageBlocks: nodeParams.WaitForMessageBlocks, + WaitForMessageError: nodeParams.WaitForMessageError, + WaitForMessageExitCode: nodeParams.WaitForMessageExitCode, + WaitForMessageRetBytes: nodeParams.WaitForMessageRetBytes, } node := &testnodes.FakeProviderNode{ @@ -715,7 +770,6 @@ func makeExecutor(ctx context.Context, MinerWorkerError: nodeParams.MinerWorkerError, PieceLength: nodeParams.PieceLength, PieceSectorID: nodeParams.PieceSectorID, - PublishDealID: nodeParams.PublishDealID, PublishDealsError: nodeParams.PublishDealsError, OnDealCompleteError: nodeParams.OnDealCompleteError, LocatePieceForDealWithinSectorError: nodeParams.LocatePieceForDealWithinSectorError, diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 5e118bde..a3e4bda0 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io/ioutil" + "math/rand" "reflect" "testing" "time" @@ -13,6 +14,7 @@ import ( graphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -50,7 +52,7 @@ func TestMakeDeal(t *testing.T) { result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}) proposalCid := result.ProposalCid - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 200) ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond) defer canc() @@ -160,7 +162,7 @@ func TestMakeDealNonBlocking(t *testing.T) { h.ClientNode.AddFundsCid = testCids[0] h.Client.Run(ctx) - h.ProviderNode.BlockOnMessageWait = true + h.ProviderNode.WaitForMessageBlocks = true h.ProviderNode.AddFundsCid = testCids[1] err := h.Provider.Start(ctx) assert.NoError(t, err) @@ -207,13 +209,22 @@ func newHarness(t *testing.T, ctx context.Context) *harness { ClientAddr: address.TestAddress, } + expDealID := abi.DealID(rand.Uint64()) + psdReturn := market.PublishStorageDealsReturn{IDs: []abi.DealID{expDealID}} + psdReturnBytes := bytes.NewBuffer([]byte{}) + err := psdReturn.MarshalCBOR(psdReturnBytes) + assert.NoError(t, err) + providerAddr := address.TestAddress2 tempPath, err := ioutil.TempDir("", "storagemarket_test") assert.NoError(t, err) ps := piecestore.NewPieceStore(td.Ds2) providerNode := &testnodes.FakeProviderNode{ - FakeCommonNode: testnodes.FakeCommonNode{SMState: smState, BlockOnMessageWait: true}, - MinerAddr: providerAddr, + FakeCommonNode: testnodes.FakeCommonNode{ + SMState: smState, + WaitForMessageRetBytes: psdReturnBytes.Bytes(), + }, + MinerAddr: providerAddr, } fs, err := filestore.NewLocalFileStore(filestore.OsPath(tempPath)) assert.NoError(t, err) diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index 2f0bdcb8..454f1ccf 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -88,13 +88,16 @@ func (sma *StorageMarketState) AddDeal(deal storagemarket.StorageDeal) (shared.T // FakeCommonNode has the common methods for the storage & client node adapters type FakeCommonNode struct { SMState *StorageMarketState - BlockOnMessageWait bool AddFundsCid cid.Cid EnsureFundsError error VerifySignatureFails bool GetBalanceError error GetChainHeadError error - WaitForMessageExit exitcode.ExitCode + + WaitForMessageBlocks bool + WaitForMessageError error + WaitForMessageExitCode exitcode.ExitCode + WaitForMessageRetBytes []byte } // GetChainHead returns the state id in the storage market state @@ -126,10 +129,15 @@ func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.A } func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error { - if n.BlockOnMessageWait { + if n.WaitForMessageError != nil { + return n.WaitForMessageError + } + + if n.WaitForMessageBlocks { time.Sleep(5 * time.Second) } - return onCompletion(n.WaitForMessageExit, nil) + + return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes) } // GetBalance returns the funds in the storage market state @@ -226,9 +234,9 @@ func (n *FakeProviderNode) PublishDeals(ctx context.Context, deal storagemarket. n.SMState.AddDeal(sd) - return n.PublishDealID, shared_testutil.GenerateCids(1)[0], nil + return 0, shared_testutil.GenerateCids(1)[0], nil } - return abi.DealID(0), cid.Undef, n.PublishDealsError + return 0, cid.Undef, n.PublishDealsError } // ListProviderDeals returns the deals in the storage market state diff --git a/storagemarket/types.go b/storagemarket/types.go index d9eb06fe..39597a1b 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -52,7 +52,8 @@ const ( StorageDealEnsureClientFunds // Ensuring that client funds are sufficient StorageDealProviderFunding // Waiting for funds to appear in Provider balance StorageDealClientFunding // Waiting for funds to appear in Client balance - StorageDealPublishing // Publishing deal to chain + StorageDealPublish // Publishing deal to chain + StorageDealPublishing // Waiting for deal to appear on chain StorageDealError // deal failed with an unexpected error StorageDealCompleted // on provider side, indicates deal is active and info for retrieval is recorded ) @@ -127,6 +128,7 @@ type MinerDeal struct { market.ClientDealProposal ProposalCid cid.Cid AddFundsCid cid.Cid + PublishCid cid.Cid Miner peer.ID Client peer.ID State StorageDealStatus @@ -189,9 +191,15 @@ const ( // ProviderEventSendResponseFailed happens when a response cannot be sent to a deal ProviderEventSendResponseFailed - // ProviderEventDealPublished happens when a deal is succesfully published + // ProviderEventDealPublishInitiated happens when a provider has sent a PublishStorageDeals message to the chain + ProviderEventDealPublishInitiated + + // ProviderEventDealPublished happens when a deal is successfully published ProviderEventDealPublished + // ProviderEventDealPublishError happens when PublishStorageDeals returns a non-ok exit code + ProviderEventDealPublishError + // ProviderEventFileStoreErrored happens when an error occurs accessing the filestore ProviderEventFileStoreErrored From c4bff70090c3e27229d9be80e2fd5e09782b583e Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Fri, 24 Apr 2020 12:58:10 -0700 Subject: [PATCH 3/4] WaitForFunding state tests for Client, Provider --- shared_testutil/test_types.go | 7 +- storagemarket/impl/clientstates/client_fsm.go | 2 +- .../impl/clientstates/client_states_test.go | 162 +++++++++++------- .../providerstates/provider_states_test.go | 83 ++++++++- storagemarket/integration_test.go | 1 + storagemarket/testnodes/testnodes.go | 4 +- 6 files changed, 181 insertions(+), 78 deletions(-) diff --git a/shared_testutil/test_types.go b/shared_testutil/test_types.go index 383d75e6..fb10335e 100644 --- a/shared_testutil/test_types.go +++ b/shared_testutil/test_types.go @@ -113,6 +113,9 @@ func MakeTestDealPayment() retrievalmarket.DealPayment { // MakeTestUnsignedDealProposal generates a deal proposal with no signature func MakeTestUnsignedDealProposal() market.DealProposal { + start := uint64(rand.Int31()) + end := start + uint64(rand.Int31()) + return market.DealProposal{ PieceCID: GenerateCids(1)[0], PieceSize: abi.PaddedPieceSize(rand.Int63()), @@ -120,8 +123,8 @@ func MakeTestUnsignedDealProposal() market.DealProposal { Client: address.TestAddress, Provider: address.TestAddress2, - StartEpoch: abi.ChainEpoch(rand.Int63()), - EndEpoch: abi.ChainEpoch(rand.Int63()), + StartEpoch: abi.ChainEpoch(start), + EndEpoch: abi.ChainEpoch(end), StoragePricePerEpoch: MakeTestTokenAmount(), ProviderCollateral: MakeTestTokenAmount(), diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index 1860736a..c223baf7 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -20,7 +20,7 @@ var ClientEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ClientEventEnsureFundsFailed). - From(storagemarket.StorageDealClientFunding).To(storagemarket.StorageDealFailing). + FromMany(storagemarket.StorageDealClientFunding, storagemarket.StorageDealEnsureClientFunds).To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.ClientDeal, err error) error { deal.Message = xerrors.Errorf("adding market funds failed: %w", err).Error() return nil diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index 509815c1..22ea861f 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -6,11 +6,13 @@ import ( "fmt" "testing" + "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-statemachine/fsm" fsmtest "github.com/filecoin-project/go-statemachine/fsm/testutil" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" @@ -26,30 +28,56 @@ func TestEnsureFunds(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) require.NoError(t, err) clientDealProposal := tut.MakeTestClientDealProposal() - runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureClientFunds, storagemarket.StorageDealClientFunding, clientDealProposal) + runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureClientFunds, storagemarket.StorageDealEnsureClientFunds, clientDealProposal) + addFundsCid := tut.GenerateCids(1)[0] - node := func(ensureFundsErr error) storagemarket.StorageClientNode { - return &testnodes.FakeClientNode{ - FakeCommonNode: testnodes.FakeCommonNode{ - SMState: testnodes.NewStorageMarketState(), - EnsureFundsError: ensureFundsErr, - }, - } - } - t.Run("EnsureClientFunds succeeds", func(t *testing.T) { - runEnsureFunds(t, node(nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { + t.Run("immediately succeeds", func(t *testing.T) { + runEnsureFunds(t, makeNode(nodeParams{}), nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State) }) }) + t.Run("succeeds by sending an AddFunds message", func(t *testing.T) { + params := nodeParams{ + AddFundsCid: addFundsCid, + } + runEnsureFunds(t, makeNode(params), nil, nil, nil, func(deal storagemarket.ClientDeal) { + require.Equal(t, storagemarket.StorageDealClientFunding, deal.State) + }) + }) + t.Run("EnsureClientFunds fails", func(t *testing.T) { - runEnsureFunds(t, node(errors.New("Something went wrong")), nil, nil, nil, func(deal storagemarket.ClientDeal) { + n := makeNode(nodeParams{ + EnsureFundsError: errors.New("Something went wrong"), + }) + runEnsureFunds(t, n, nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "adding market funds failed: Something went wrong", deal.Message) }) }) } +func TestWaitForFunding(t *testing.T) { + ctx := context.Background() + eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) + require.NoError(t, err) + clientDealProposal := tut.MakeTestClientDealProposal() + runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.WaitForFunding, storagemarket.StorageDealClientFunding, clientDealProposal) + + t.Run("succeeds", func(t *testing.T) { + runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.Ok}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State) + }) + }) + + t.Run("EnsureClientFunds fails", func(t *testing.T) { + runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.ErrInsufficientFunds}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, "adding market funds failed: AddFunds exit code: 19", deal.Message) + }) + }) +} + func TestProposeDeal(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) @@ -63,29 +91,21 @@ func TestProposeDeal(t *testing.T) { }) } - node := func() storagemarket.StorageClientNode { - return &testnodes.FakeClientNode{ - FakeCommonNode: testnodes.FakeCommonNode{ - SMState: testnodes.NewStorageMarketState(), - }, - } - } - t.Run("succeeds", func(t *testing.T) { - runProposeDeal(t, node(), nil, dealStream(tut.TrivialStorageDealProposalWriter), nil, func(deal storagemarket.ClientDeal) { + runProposeDeal(t, makeNode(nodeParams{}), nil, dealStream(tut.TrivialStorageDealProposalWriter), nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealValidating, deal.State) }) }) t.Run("deal stream lookup fails", func(t *testing.T) { - runProposeDeal(t, node(), errors.New("deal stream not found"), nil, nil, func(deal storagemarket.ClientDeal) { + runProposeDeal(t, makeNode(nodeParams{}), errors.New("deal stream not found"), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "miner connection error: deal stream not found", deal.Message) }) }) t.Run("write proposal fails fails", func(t *testing.T) { - runProposeDeal(t, node(), nil, dealStream(tut.FailStorageProposalWriter), nil, func(deal storagemarket.ClientDeal) { + runProposeDeal(t, makeNode(nodeParams{}), nil, dealStream(tut.FailStorageProposalWriter), nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "sending proposal to storage provider failed: write proposal failed", deal.Message) }) @@ -109,15 +129,6 @@ func TestVerifyResponse(t *testing.T) { }) } - node := func(verifySignatureFails bool) storagemarket.StorageClientNode { - return &testnodes.FakeClientNode{ - FakeCommonNode: testnodes.FakeCommonNode{ - SMState: testnodes.NewStorageMarketState(), - VerifySignatureFails: verifySignatureFails, - }, - } - } - t.Run("succeeds", func(t *testing.T) { stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ Response: smnet.Response{ @@ -127,7 +138,7 @@ func TestVerifyResponse(t *testing.T) { }, Signature: tut.MakeTestSignature(), })) - runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State) require.Equal(t, publishMessage, deal.PublishMessage) }) @@ -135,14 +146,14 @@ func TestVerifyResponse(t *testing.T) { t.Run("deal stream lookup fails", func(t *testing.T) { dealStreamErr := errors.New("deal stream not found") - runVerifyResponse(t, node(false), dealStreamErr, dealStream(nil), nil, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), dealStreamErr, dealStream(nil), nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "miner connection error: deal stream not found", deal.Message) }) }) t.Run("read response fails", func(t *testing.T) { - runVerifyResponse(t, node(false), nil, dealStream(tut.FailStorageResponseReader), nil, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, dealStream(tut.FailStorageResponseReader), nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error reading Response message: read response failed", deal.Message) }) @@ -157,7 +168,7 @@ func TestVerifyResponse(t *testing.T) { }, Signature: tut.MakeTestSignature(), })) - failToVerifyNode := node(true) + failToVerifyNode := makeNode(nodeParams{VerifySignatureFails: true}) runVerifyResponse(t, failToVerifyNode, nil, stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "unable to verify signature on deal response", deal.Message) @@ -173,7 +184,7 @@ func TestVerifyResponse(t *testing.T) { }, Signature: tut.MakeTestSignature(), })) - runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Regexp(t, "^miner responded to a wrong proposal:", deal.Message) }) @@ -190,7 +201,7 @@ func TestVerifyResponse(t *testing.T) { Signature: tut.MakeTestSignature(), })) expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected) - runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, deal.Message, expErr) }) @@ -206,7 +217,7 @@ func TestVerifyResponse(t *testing.T) { Signature: tut.MakeTestSignature(), })) closeStreamErr := errors.New("something went wrong") - runVerifyResponse(t, node(false), nil, stream, closeStreamErr, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, closeStreamErr, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error attempting to close stream: something went wrong", deal.Message) }) @@ -221,25 +232,19 @@ func TestValidateDealPublished(t *testing.T) { clientDealProposal := tut.MakeTestClientDealProposal() runValidateDealPublished := makeExecutor(ctx, eventProcessor, clientstates.ValidateDealPublished, storagemarket.StorageDealProposalAccepted, clientDealProposal) - node := func(dealID abi.DealID, validatePublishedErr error) storagemarket.StorageClientNode { - return &testnodes.FakeClientNode{ - FakeCommonNode: testnodes.FakeCommonNode{ - SMState: testnodes.NewStorageMarketState(), - }, - ValidatePublishedDealID: dealID, - ValidatePublishedError: validatePublishedErr, - } - } - t.Run("succeeds", func(t *testing.T) { - runValidateDealPublished(t, node(abi.DealID(5), nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runValidateDealPublished(t, makeNode(nodeParams{ValidatePublishedDealID: abi.DealID(5)}), nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealSealing, deal.State) require.Equal(t, abi.DealID(5), deal.DealID) }) }) t.Run("fails", func(t *testing.T) { - runValidateDealPublished(t, node(abi.DealID(5), errors.New("Something went wrong")), nil, nil, nil, func(deal storagemarket.ClientDeal) { + n := makeNode(nodeParams{ + ValidatePublishedDealID: abi.DealID(5), + ValidatePublishedError: errors.New("Something went wrong"), + }) + runValidateDealPublished(t, n, nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error validating deal published: Something went wrong", deal.Message) }) @@ -253,31 +258,21 @@ func TestVerifyDealActivated(t *testing.T) { clientDealProposal := tut.MakeTestClientDealProposal() runVerifyDealActivated := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealActivated, storagemarket.StorageDealSealing, clientDealProposal) - node := func(syncError error, asyncError error) storagemarket.StorageClientNode { - return &testnodes.FakeClientNode{ - FakeCommonNode: testnodes.FakeCommonNode{ - SMState: testnodes.NewStorageMarketState(), - }, - DealCommittedSyncError: syncError, - DealCommittedAsyncError: asyncError, - } - } - t.Run("succeeds", func(t *testing.T) { - runVerifyDealActivated(t, node(nil, nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runVerifyDealActivated(t, makeNode(nodeParams{}), nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealActive, deal.State) }) }) t.Run("fails synchronously", func(t *testing.T) { - runVerifyDealActivated(t, node(errors.New("Something went wrong"), nil), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedSyncError: errors.New("Something went wrong")}), nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error in deal activation: Something went wrong", deal.Message) }) }) t.Run("fails asynchronously", func(t *testing.T) { - runVerifyDealActivated(t, node(nil, errors.New("Something went wrong later")), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedAsyncError: errors.New("Something went wrong later")}), nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error in deal activation: Something went wrong later", deal.Message) }) @@ -334,6 +329,45 @@ func makeExecutor(ctx context.Context, } } +type nodeParams struct { + AddFundsCid cid.Cid + EnsureFundsError error + VerifySignatureFails bool + GetBalanceError error + GetChainHeadError error + WaitForMessageBlocks bool + WaitForMessageError error + WaitForMessageExitCode exitcode.ExitCode + WaitForMessageRetBytes []byte + ClientAddr address.Address + ValidationError error + ValidatePublishedDealID abi.DealID + ValidatePublishedError error + DealCommittedSyncError error + DealCommittedAsyncError error +} + +func makeNode(params nodeParams) storagemarket.StorageClientNode { + var out testnodes.FakeClientNode + out.SMState = testnodes.NewStorageMarketState() + out.AddFundsCid = params.AddFundsCid + out.EnsureFundsError = params.EnsureFundsError + out.VerifySignatureFails = params.VerifySignatureFails + out.GetBalanceError = params.GetBalanceError + out.GetChainHeadError = params.GetChainHeadError + out.WaitForMessageBlocks = params.WaitForMessageBlocks + out.WaitForMessageError = params.WaitForMessageError + out.WaitForMessageExitCode = params.WaitForMessageExitCode + out.WaitForMessageRetBytes = params.WaitForMessageRetBytes + out.ClientAddr = params.ClientAddr + out.ValidationError = params.ValidationError + out.ValidatePublishedDealID = params.ValidatePublishedDealID + out.ValidatePublishedError = params.ValidatePublishedError + out.DealCommittedSyncError = params.DealCommittedSyncError + out.DealCommittedAsyncError = params.DealCommittedAsyncError + return &out +} + type fakeEnvironment struct { node storagemarket.StorageClientNode dealStream smnet.StorageDealStream diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 1bfccf0c..4f744050 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -237,11 +237,52 @@ func TestVerifyData(t *testing.T) { } } +func TestWaitForFunding(t *testing.T) { + ctx := context.Background() + eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) + require.NoError(t, err) + runWaitForFunding := makeExecutor(ctx, eventProcessor, providerstates.WaitForFunding, storagemarket.StorageDealProviderFunding) + tests := map[string]struct { + nodeParams nodeParams + dealParams dealParams + environmentParams environmentParams + fileStoreParams tut.TestFileStoreParams + pieceStoreParams tut.TestPieceStoreParams + dealInspector func(t *testing.T, deal storagemarket.MinerDeal) + }{ + "succeeds": { + nodeParams: nodeParams{ + WaitForMessageExitCode: exitcode.Ok, + WaitForMessageRetBytes: []byte{}, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, storagemarket.StorageDealPublish, deal.State) + }, + }, + "AddFunds returns non-ok exit code": { + nodeParams: nodeParams{ + WaitForMessageExitCode: exitcode.ErrInsufficientFunds, + WaitForMessageRetBytes: []byte{}, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, fmt.Sprintf("error calling node: AddFunds exit code: %s", exitcode.ErrInsufficientFunds), deal.Message) + }, + }, + } + for test, data := range tests { + t.Run(test, func(t *testing.T) { + runWaitForFunding(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) + }) + } +} + func TestEnsureProviderFunds(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) require.NoError(t, err) runEnsureProviderFunds := makeExecutor(ctx, eventProcessor, providerstates.EnsureProviderFunds, storagemarket.StorageDealEnsureProviderFunds) + cids := tut.GenerateCids(1) tests := map[string]struct { nodeParams nodeParams dealParams dealParams @@ -250,11 +291,23 @@ func TestEnsureProviderFunds(t *testing.T) { pieceStoreParams tut.TestPieceStoreParams dealInspector func(t *testing.T, deal storagemarket.MinerDeal) }{ - "succeeds": { + "succeeds immediately": { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, storagemarket.StorageDealPublish, deal.State) }, }, + "succeeds by sending an AddBalance message": { + dealParams: dealParams{ + ProviderCollateral: abi.NewTokenAmount(1), + }, + nodeParams: nodeParams{ + AddFundsCid: cids[0], + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { + require.Equal(t, storagemarket.StorageDealProviderFunding, deal.State) + require.Equal(t, cids[0], deal.AddFundsCid) + }, + }, "get miner worker fails": { nodeParams: nodeParams{ MinerWorkerError: errors.New("could not get worker"), @@ -323,12 +376,7 @@ func TestWaitForPublish(t *testing.T) { eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) require.NoError(t, err) runWaitForPublish := makeExecutor(ctx, eventProcessor, providerstates.WaitForPublish, storagemarket.StorageDealPublishing) - expDealID := abi.DealID(rand.Uint64()) - - psdReturn := market.PublishStorageDealsReturn{IDs: []abi.DealID{expDealID}} - psdReturnBytes := bytes.NewBuffer([]byte{}) - err = psdReturn.MarshalCBOR(psdReturnBytes) - require.NoError(t, err) + expDealID, psdReturnBytes := generatePublishDealsReturn(t) tests := map[string]struct { nodeParams nodeParams @@ -340,7 +388,7 @@ func TestWaitForPublish(t *testing.T) { }{ "succeeds": { nodeParams: nodeParams{ - WaitForMessageRetBytes: psdReturnBytes.Bytes(), + WaitForMessageRetBytes: psdReturnBytes, }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, storagemarket.StorageDealStaged, deal.State) @@ -359,7 +407,7 @@ func TestWaitForPublish(t *testing.T) { }, "SendSignedResponse errors": { nodeParams: nodeParams{ - WaitForMessageRetBytes: psdReturnBytes.Bytes(), + WaitForMessageRetBytes: psdReturnBytes, }, environmentParams: environmentParams{ SendSignedResponseError: errors.New("could not send"), @@ -670,6 +718,17 @@ var defaultMetadataFile = tut.NewTestFile(tut.TestFileParams{ Size: 400, }) +func generatePublishDealsReturn(t *testing.T) (abi.DealID, []byte) { + dealId := abi.DealID(rand.Uint64()) + + psdReturn := market.PublishStorageDealsReturn{IDs: []abi.DealID{dealId}} + psdReturnBytes := bytes.NewBuffer([]byte{}) + err := psdReturn.MarshalCBOR(psdReturnBytes) + require.NoError(t, err) + + return dealId, psdReturnBytes.Bytes() +} + type nodeParams struct { MinerAddr address.Address MinerWorkerError error @@ -678,6 +737,7 @@ type nodeParams struct { TipSetToken shared.TipSetToken ClientMarketBalance abi.TokenAmount ClientMarketBalanceError error + AddFundsCid cid.Cid VerifySignatureFails bool MostRecentStateIDError error PieceLength uint64 @@ -700,6 +760,7 @@ type dealParams struct { DealID abi.DealID DataRef *storagemarket.DataRef StoragePricePerEpoch abi.TokenAmount + ProviderCollateral abi.TokenAmount PieceSize abi.PaddedPieceSize StartEpoch abi.ChainEpoch EndEpoch abi.ChainEpoch @@ -758,6 +819,7 @@ func makeExecutor(ctx context.Context, GetBalanceError: nodeParams.ClientMarketBalanceError, VerifySignatureFails: nodeParams.VerifySignatureFails, EnsureFundsError: nodeParams.EnsureFundsError, + AddFundsCid: nodeParams.AddFundsCid, WaitForMessageBlocks: nodeParams.WaitForMessageBlocks, WaitForMessageError: nodeParams.WaitForMessageError, WaitForMessageExitCode: nodeParams.WaitForMessageExitCode, @@ -795,6 +857,9 @@ func makeExecutor(ctx context.Context, if !dealParams.StoragePricePerEpoch.Nil() { proposal.StoragePricePerEpoch = dealParams.StoragePricePerEpoch } + if !dealParams.ProviderCollateral.Nil() { + proposal.ProviderCollateral = dealParams.ProviderCollateral + } if dealParams.StartEpoch != abi.ChainEpoch(0) { proposal.StartEpoch = dealParams.StartEpoch } diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index a3e4bda0..1693ada2 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -73,6 +73,7 @@ func TestMakeDeal(t *testing.T) { storagemarket.StorageDealTransferring, storagemarket.StorageDealVerifyData, storagemarket.StorageDealEnsureProviderFunds, + storagemarket.StorageDealPublish, storagemarket.StorageDealPublishing, storagemarket.StorageDealStaged, storagemarket.StorageDealSealing, diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index 454f1ccf..5970b79e 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -3,7 +3,6 @@ package testnodes import ( "context" "io" - "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" @@ -134,7 +133,8 @@ func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onComple } if n.WaitForMessageBlocks { - time.Sleep(5 * time.Second) + // just leave the test node in this state to simulate a long operation + return nil } return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes) From dc5e00117cfee4ae8fd7902c00eebfe57dacaf4f Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Mon, 27 Apr 2020 16:50:27 -0700 Subject: [PATCH 4/4] Remove deal id from the provider node api PublishDeals --- storagemarket/impl/providerstates/provider_states.go | 3 +-- storagemarket/testnodes/testnodes.go | 6 +++--- storagemarket/types.go | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index d864d348..9bf07b33 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -210,8 +210,7 @@ func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor Ref: deal.Ref, } - // TODO: PublishDeals does not return the deal id, change API - _, mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal) + mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err)) } diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index 5970b79e..bcab0ca9 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -225,7 +225,7 @@ type FakeProviderNode struct { } // PublishDeals simulates publishing a deal by adding it to the storage market state -func (n *FakeProviderNode) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (abi.DealID, cid.Cid, error) { +func (n *FakeProviderNode) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) { if n.PublishDealsError == nil { sd := storagemarket.StorageDeal{ DealProposal: deal.Proposal, @@ -234,9 +234,9 @@ func (n *FakeProviderNode) PublishDeals(ctx context.Context, deal storagemarket. n.SMState.AddDeal(sd) - return 0, shared_testutil.GenerateCids(1)[0], nil + return shared_testutil.GenerateCids(1)[0], nil } - return 0, cid.Undef, n.PublishDealsError + return cid.Undef, n.PublishDealsError } // ListProviderDeals returns the deals in the storage market state diff --git a/storagemarket/types.go b/storagemarket/types.go index 39597a1b..43297334 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -369,8 +369,8 @@ type StorageProviderNode interface { GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) - // Publishes deal on chain - PublishDeals(ctx context.Context, deal MinerDeal) (abi.DealID, cid.Cid, error) + // Publishes deal on chain, returns the message cid, but does not wait for message to appear + PublishDeals(ctx context.Context, deal MinerDeal) (cid.Cid, error) // ListProviderDeals lists all deals associated with a storage provider ListProviderDeals(ctx context.Context, addr address.Address, tok shared.TipSetToken) ([]StorageDeal, error)