diff --git a/docs/retrievalclient.mmd.png b/docs/retrievalclient.mmd.png index 2c0967dc..b135a4cb 100644 Binary files a/docs/retrievalclient.mmd.png and b/docs/retrievalclient.mmd.png differ diff --git a/docs/retrievalclient.mmd.svg b/docs/retrievalclient.mmd.svg index dafad62f..1f687666 100644 --- a/docs/retrievalclient.mmd.svg +++ b/docs/retrievalclient.mmd.svg @@ -1,6 +1,6 @@ -

ClientEventFundsReserved + + + note left of 3 : The following events only record in this state.

ClientEventFundsReleased + + + note left of 11 : The following events only record in this state.

ClientEventFundsReleased + 9 --> [*] 8 --> [*] 26 --> [*] diff --git a/docs/storageclient.mmd.png b/docs/storageclient.mmd.png index 95be3a02..9574b7a1 100644 Binary files a/docs/storageclient.mmd.png and b/docs/storageclient.mmd.png differ diff --git a/docs/storageclient.mmd.svg b/docs/storageclient.mmd.svg index 4164cf66..7d30764e 100644 --- a/docs/storageclient.mmd.svg +++ b/docs/storageclient.mmd.svg @@ -1,6 +1,6 @@ -ClientEventOpenClientEventFundingInitiatedClientEventEnsureFundsFailedClientEventEnsureFundsFailedClientEventFundsEnsuredClientEventFundsEnsuredClientEventWriteProposalFailedClientEventReadResponseFailedClientEventResponseVerificationFailedClientEventInitiateDataTransferClientEventUnexpectedDealStateClientEventDataTransferFailedClientEventDataTransferFailedClientEventDataTransferInitiatedClientEventDataTransferCompleteClientEventDataTransferCompleteClientEventWaitForDealStateClientEventResponseDealDidNotMatchClientEventDealRejectedClientEventDealAcceptedClientEventDealPublishFailedClientEventDealPublishedClientEventDealActivationFailedClientEventDealActivatedClientEventDealSlashedClientEventDealExpiredClientEventDealCompletionFailedClientEventFailedStorageDealUnknownStorageDealProposalAcceptedOn entry runs ValidateDealPublishedStorageDealSealingOn entry runs VerifyDealActivatedStorageDealActiveOn entry runs WaitForDealCompletionStorageDealExpiredStorageDealSlashedStorageDealFailingOn entry runs FailDealStorageDealFundsEnsuredOn entry runs ProposeDealStorageDealCheckForAcceptanceOn entry runs CheckForDealAcceptanceStorageDealStartDataTransferOn entry runs InitiateDataTransferStorageDealTransferringStorageDealEnsureClientFundsOn entry runs EnsureClientFundsStorageDealClientFundingOn entry runs WaitForFundingStorageDealErrorThe following events are not shown cause they can trigger from any state.ClientEventStreamCloseError - transitions state to StorageDealErrorClientEventRestart - does not transition stateThe following events only record in this state.ClientEventFundsReservedThe following events only record in this state.ClientEventFundsReleasedThe following events only record in this state.ClientEventFundsReleased \ No newline at end of file diff --git a/docs/storageprovider.mmd b/docs/storageprovider.mmd index bac88c01..6ee5eecb 100644 --- a/docs/storageprovider.mmd +++ b/docs/storageprovider.mmd @@ -78,6 +78,16 @@ stateDiagram-v2 10 --> 26 : ProviderEventRestart 14 --> 26 : ProviderEventRestart 15 --> 26 : ProviderEventRestart + 20 --> 11 : ProviderEventTrackFundsFailed + + note left of 20 : The following events only record in this state.

ProviderEventFundsReserved + + + note left of 11 : The following events only record in this state.

ProviderEventFundsReleased + + + note left of 25 : The following events only record in this state.

ProviderEventFundsReleased + 26 --> [*] 9 --> [*] 8 --> [*] diff --git a/docs/storageprovider.mmd.png b/docs/storageprovider.mmd.png index 39c7d2b3..481f74a1 100644 Binary files a/docs/storageprovider.mmd.png and b/docs/storageprovider.mmd.png differ diff --git a/docs/storageprovider.mmd.svg b/docs/storageprovider.mmd.svg index 164a23bd..b7f64f48 100644 --- a/docs/storageprovider.mmd.svg +++ b/docs/storageprovider.mmd.svg @@ -1,6 +1,6 @@ -ProviderEventOpenProviderEventDealRejectedProviderEventDealRejectedProviderEventDealRejectedProviderEventRejectionSentProviderEventDealDecidingProviderEventDataRequestedProviderEventDataTransferFailedProviderEventDataTransferInitiatedProviderEventDataTransferCompletedProviderEventDataVerificationFailedProviderEventVerifiedDataProviderEventVerifiedDataProviderEventFundingInitiatedProviderEventFundedProviderEventFundedProviderEventDealPublishInitiatedProviderEventDealPublishErrorProviderEventSendResponseFailedProviderEventSendResponseFailedProviderEventDealPublishedProviderEventFileStoreErroredProviderEventFileStoreErroredProviderEventFileStoreErroredProviderEventMultistoreErroredProviderEventDealHandoffFailedProviderEventDealHandedOffProviderEventDealActivationFailedProviderEventDealActivatedProviderEventPieceStoreErroredProviderEventUnableToLocatePieceProviderEventReadMetadataErroredProviderEventPieceRecordedProviderEventDealSlashedProviderEventDealExpiredProviderEventDealCompletionFailedProviderEventFailedProviderEventRestartProviderEventRestartProviderEventRestartProviderEventTrackFundsFailedStorageDealUnknownStorageDealStagedOn entry runs HandoffDealStorageDealSealingOn entry runs VerifyDealActivated<invalid Value>On entry runs RecordPieceInfoStorageDealActiveOn entry runs WaitForDealCompletionStorageDealExpiredStorageDealSlashedStorageDealRejectingOn entry runs RejectDealStorageDealFailingOn entry runs FailDealStorageDealValidatingOn entry runs ValidateDealProposalStorageDealAcceptWaitOn entry runs DecideOnProposalStorageDealTransferringStorageDealWaitingForDataStorageDealVerifyDataOn entry runs VerifyDataStorageDealEnsureProviderFundsOn entry runs EnsureProviderFundsStorageDealProviderFundingOn entry runs WaitForFundingStorageDealPublishOn entry runs PublishDealStorageDealPublishingOn entry runs WaitForPublishStorageDealErrorThe following events are not shown cause they can trigger from any state.ProviderEventNodeErrored - transitions state to StorageDealFailingProviderEventRestart - does not transition stateThe following events only record in this state.ProviderEventFundsReservedThe following events only record in this state.ProviderEventFundsReleasedThe following events only record in this state.ProviderEventFundsReleased \ No newline at end of file diff --git a/go.mod b/go.mod index 07669f21..fbf2c8f1 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.5.1 github.com/filecoin-project/go-multistore v0.0.2 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 - github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989 + github.com/filecoin-project/go-statemachine v0.0.0-20200730031800-c3336614d2a7 github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6 diff --git a/go.sum b/go.sum index 0b8b7bf5..3b084ffb 100644 --- a/go.sum +++ b/go.sum @@ -122,6 +122,8 @@ github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyC github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989 h1:1GjCS3xy/CRIw7Tq0HfzX6Al8mklrszQZ3iIFnjPzHk= github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= +github.com/filecoin-project/go-statemachine v0.0.0-20200730031800-c3336614d2a7 h1:KAF3WM/xSnl6G6RHX8vDJthg4+e4PSgBh72//6c6Qvc= +github.com/filecoin-project/go-statemachine v0.0.0-20200730031800-c3336614d2a7/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index cc6c0289..3129e666 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -40,6 +40,7 @@ import ( tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" stormkt "github.com/filecoin-project/go-fil-markets/storagemarket/impl" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" stornet "github.com/filecoin-project/go-fil-markets/storagemarket/network" @@ -262,6 +263,9 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness { peerResolver := discovery.NewLocal(td.Ds1) + clientDealFunds, err := funds.NewDealFunds(td.Ds1, datastore.NewKey("storage/client/dealfunds")) + require.NoError(t, err) + client, err := stormkt.NewClient( stornet.NewFromLibp2pHost(td.Host1), td.Bs1, @@ -270,6 +274,7 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness { peerResolver, td.Ds1, &clientNode, + clientDealFunds, stormkt.DealPollingInterval(0), ) require.NoError(t, err) @@ -281,9 +286,11 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness { require.NoError(t, err) rv2 := requestvalidation.NewUnifiedRequestValidator(statestore.New(td.Ds2), nil) require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, rv2)) - storedAsk, err := storedask.NewStoredAsk(td.Ds2, datastore.NewKey("latest-ask"), providerNode, providerAddr) require.NoError(t, err) + providerDealFunds, err := funds.NewDealFunds(td.Ds1, datastore.NewKey("storage/provider/dealfunds")) + require.NoError(t, err) + provider, err := stormkt.NewProvider( stornet.NewFromLibp2pHost(td.Host2), td.Ds2, @@ -295,6 +302,7 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness { providerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, storedAsk, + providerDealFunds, ) require.NoError(t, err) diff --git a/shared_testutil/test_deal_funds.go b/shared_testutil/test_deal_funds.go new file mode 100644 index 00000000..8031fb76 --- /dev/null +++ b/shared_testutil/test_deal_funds.go @@ -0,0 +1,38 @@ +package shared_testutil + +import ( + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" +) + +func NewTestDealFunds() *TestDealFunds { + return &TestDealFunds{ + reserved: big.Zero(), + } +} + +type TestDealFunds struct { + reserved abi.TokenAmount + ReserveCalls []abi.TokenAmount + ReleaseCalls []abi.TokenAmount +} + +func (f *TestDealFunds) Get() abi.TokenAmount { + return f.reserved +} + +func (f *TestDealFunds) Reserve(amount abi.TokenAmount) (abi.TokenAmount, error) { + f.reserved = big.Add(f.reserved, amount) + f.ReserveCalls = append(f.ReserveCalls, amount) + return f.reserved, nil +} + +func (f *TestDealFunds) Release(amount abi.TokenAmount) (abi.TokenAmount, error) { + f.reserved = big.Sub(f.reserved, amount) + f.ReleaseCalls = append(f.ReleaseCalls, amount) + return f.reserved, nil +} + +var _ funds.DealFunds = &TestDealFunds{} diff --git a/storagemarket/events.go b/storagemarket/events.go index c24ef62d..27c2eb7c 100644 --- a/storagemarket/events.go +++ b/storagemarket/events.go @@ -13,6 +13,12 @@ const ( // ClientEventFundingInitiated happens when a client has sent a message adding funds to its balance ClientEventFundingInitiated + // ClientEventFundsReserved happens when a client reserves funds for a deal (updating our tracked funds) + ClientEventFundsReserved + + // ClientEventFundsReleased happens when a client released funds for a deal (updating our tracked funds) + ClientEventFundsReleased + // ClientEventFundsEnsured happens when a client successfully ensures it has funds for a deal ClientEventFundsEnsured @@ -88,6 +94,8 @@ var ClientEvents = map[ClientEvent]string{ ClientEventOpen: "ClientEventOpen", ClientEventEnsureFundsFailed: "ClientEventEnsureFundsFailed", ClientEventFundingInitiated: "ClientEventFundingInitiated", + ClientEventFundsReserved: "ClientEventFundsReserved", + ClientEventFundsReleased: "ClientEventFundsReleased", ClientEventFundsEnsured: "ClientEventFundsEnsured", ClientEventWriteProposalFailed: "ClientEventWriteProposalFailed", ClientEventInitiateDataTransfer: "ClientEventInitiateDataTransfer", @@ -138,6 +146,12 @@ const ( // ProviderEventInsufficientFunds indicates not enough funds available for a deal ProviderEventInsufficientFunds + // ProviderEventFundsReserved indicates we've reserved funds for a deal, adding to our overall total + ProviderEventFundsReserved + + // ProviderEventFundsReleased indicates we've released funds for a deal + ProviderEventFundsReleased + // ProviderEventFundingInitiated indicates provider collateral funding has been initiated ProviderEventFundingInitiated @@ -220,6 +234,9 @@ const ( // ProviderEventFailed indicates a deal has failed and should no longer be processed ProviderEventFailed + // ProviderEventTrackFundsFailed indicates a failure trying to locally track funds needed for deals + ProviderEventTrackFundsFailed + // ProviderEventRestart is used to resume the deal after a state machine shutdown ProviderEventRestart ) @@ -233,6 +250,8 @@ var ProviderEvents = map[ProviderEvent]string{ ProviderEventDealAccepted: "ProviderEventDealAccepted", ProviderEventDealDeciding: "ProviderEventDealDeciding", ProviderEventInsufficientFunds: "ProviderEventInsufficientFunds", + ProviderEventFundsReserved: "ProviderEventFundsReserved", + ProviderEventFundsReleased: "ProviderEventFundsReleased", ProviderEventFundingInitiated: "ProviderEventFundingInitiated", ProviderEventFunded: "ProviderEventFunded", ProviderEventDataTransferFailed: "ProviderEventDataTransferFailed", @@ -260,5 +279,6 @@ var ProviderEvents = map[ProviderEvent]string{ ProviderEventDealExpired: "ProviderEventDealExpired", ProviderEventDealSlashed: "ProviderEventDealSlashed", ProviderEventFailed: "ProviderEventFailed", + ProviderEventTrackFundsFailed: "ProviderEventTrackFundsFailed", ProviderEventRestart: "ProviderEventRestart", } diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 1c0884be..44efad46 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -33,6 +33,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -56,6 +57,7 @@ type Client struct { pubSub *pubsub.PubSub statemachines fsm.Group pollingInterval time.Duration + dealFunds funds.DealFunds } // StorageClientOption allows custom configuration of a storage client @@ -78,6 +80,7 @@ func NewClient( discovery *discovery.Local, ds datastore.Batching, scn storagemarket.StorageClientNode, + dealFunds funds.DealFunds, options ...StorageClientOption, ) (*Client, error) { carIO := cario.NewCarIO() @@ -91,6 +94,7 @@ func NewClient( pio: pio, pubSub: pubsub.New(clientDispatcher), pollingInterval: DefaultPollingInterval, + dealFunds: dealFunds, } statemachines, err := newClientStateMachine( @@ -544,6 +548,10 @@ func (c *clientDealEnvironment) PollingInterval() time.Duration { return c.c.pollingInterval } +func (c *clientDealEnvironment) DealFunds() funds.DealFunds { + return c.c.dealFunds +} + type clientStoreGetter struct { c *Client } diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index 0e6e7d07..b31e2b82 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/go-fil-markets/storagemarket" ) @@ -26,6 +27,22 @@ var ClientEvents = fsm.Events{ deal.Message = xerrors.Errorf("adding market funds failed: %w", err).Error() return nil }), + fsm.Event(storagemarket.ClientEventFundsReserved). + From(storagemarket.StorageDealEnsureClientFunds).ToJustRecord(). + Action(func(deal *storagemarket.ClientDeal, fundsReserved abi.TokenAmount) error { + if deal.FundsReserved.Nil() { + deal.FundsReserved = fundsReserved + } else { + deal.FundsReserved = big.Add(deal.FundsReserved, fundsReserved) + } + return nil + }), + fsm.Event(storagemarket.ClientEventFundsReleased). + FromMany(storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealFailing).ToJustRecord(). + Action(func(deal *storagemarket.ClientDeal, fundsReleased abi.TokenAmount) error { + deal.FundsReserved = big.Subtract(deal.FundsReserved, fundsReleased) + return nil + }), fsm.Event(storagemarket.ClientEventFundsEnsured). FromMany(storagemarket.StorageDealEnsureClientFunds, storagemarket.StorageDealClientFunding).To(storagemarket.StorageDealFundsEnsured), fsm.Event(storagemarket.ClientEventWriteProposalFailed). @@ -67,9 +84,9 @@ var ClientEvents = fsm.Events{ fsm.Event(storagemarket.ClientEventWaitForDealState). From(storagemarket.StorageDealCheckForAcceptance).ToNoChange(). Action(func(deal *storagemarket.ClientDeal, pollError bool) error { - deal.PollRetryCount += 1 + deal.PollRetryCount++ if pollError { - deal.PollErrorCount += 1 + deal.PollErrorCount++ } return nil }), diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index 13e269b1..da070124 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -32,6 +33,7 @@ type ClientDealEnvironment interface { StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error GetProviderDealState(ctx context.Context, proposalCid cid.Cid) (*storagemarket.ProviderDealState, error) PollingInterval() time.Duration + DealFunds() funds.DealFunds } // ClientStateEntryFunc is the type for all state entry functions on a storage client @@ -46,7 +48,19 @@ func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("acquiring chain head: %w", err)) } - mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement(), tok) + var requiredFunds abi.TokenAmount + if deal.FundsReserved.Nil() || deal.FundsReserved.IsZero() { + requiredFunds, err = environment.DealFunds().Reserve(deal.Proposal.ClientBalanceRequirement()) + if err != nil { + return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("tracking deal funds: %w", err)) + } + } else { + requiredFunds = environment.DealFunds().Get() + } + + _ = ctx.Trigger(storagemarket.ClientEventFundsReserved, deal.Proposal.ClientBalanceRequirement()) + + mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, requiredFunds, tok) if err != nil { return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err) @@ -191,6 +205,15 @@ func ValidateDealPublished(ctx fsm.Context, environment ClientDealEnvironment, d return ctx.Trigger(storagemarket.ClientEventDealPublishFailed, err) } + if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { + _, err = environment.DealFunds().Release(deal.FundsReserved) + if err != nil { + // nonfatal error + log.Warnf("failed to release funds from local tracker: %s", err) + } + _ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved) + } + return ctx.Trigger(storagemarket.ClientEventDealPublished, dealID) } @@ -242,6 +265,15 @@ func WaitForDealCompletion(ctx fsm.Context, environment ClientDealEnvironment, d // FailDeal cleans up a failing deal func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { + if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { + _, err := environment.DealFunds().Release(deal.FundsReserved) + if err != nil { + // nonfatal error + log.Warnf("failed to release funds from local tracker: %s", err) + } + _ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved) + } + // TODO: store in some sort of audit log log.Errorf("deal %s failed: %s", deal.ProposalCid, deal.Message) diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index 4636db4a..6fd035a9 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -24,6 +24,7 @@ import ( tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" ) @@ -35,6 +36,21 @@ func TestEnsureFunds(t *testing.T) { runAndInspect(t, storagemarket.StorageDealEnsureClientFunds, clientstates.EnsureClientFunds, testCase{ inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) + assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved) + }, + }) + }) + t.Run("resume, funds reserved prior", func(t *testing.T) { + runAndInspect(t, storagemarket.StorageDealEnsureClientFunds, clientstates.EnsureClientFunds, testCase{ + stateParams: dealStateParams{ + reserveFunds: true, + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) + assert.Len(t, env.dealFunds.ReserveCalls, 0) + assert.Len(t, env.dealFunds.ReleaseCalls, 0) }, }) }) @@ -43,6 +59,9 @@ func TestEnsureFunds(t *testing.T) { nodeParams: nodeParams{AddFundsCid: tut.GenerateCids(1)[0]}, inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealClientFunding, deal.State) + assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved) }, }) }) @@ -54,6 +73,9 @@ func TestEnsureFunds(t *testing.T) { inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) assert.Equal(t, "adding market funds failed: Something went wrong", deal.Message) + assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved) }, }) }) @@ -339,9 +361,24 @@ func TestValidateDealPublished(t *testing.T) { t.Run("succeeds", func(t *testing.T) { runAndInspect(t, storagemarket.StorageDealProposalAccepted, clientstates.ValidateDealPublished, testCase{ nodeParams: nodeParams{ValidatePublishedDealID: abi.DealID(5)}, + stateParams: dealStateParams{ + reserveFunds: true, + }, inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) assert.Equal(t, abi.DealID(5), deal.DealID) + assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) + }, + }) + }) + t.Run("succeeds, funds already released", func(t *testing.T) { + runAndInspect(t, storagemarket.StorageDealProposalAccepted, clientstates.ValidateDealPublished, testCase{ + nodeParams: nodeParams{ValidatePublishedDealID: abi.DealID(5)}, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) + assert.Equal(t, abi.DealID(5), deal.DealID) + assert.Len(t, env.dealFunds.ReleaseCalls, 0) }, }) }) @@ -435,6 +472,30 @@ func TestWaitForDealCompletion(t *testing.T) { }) } +func TestFailDeal(t *testing.T) { + t.Run("releases funds", func(t *testing.T) { + runAndInspect(t, storagemarket.StorageDealFailing, clientstates.FailDeal, testCase{ + stateParams: dealStateParams{ + reserveFunds: true, + }, + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) + }, + }) + }) + t.Run("funds already released", func(t *testing.T) { + runAndInspect(t, storagemarket.StorageDealFailing, clientstates.FailDeal, testCase{ + inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) + }, + }) + }) +} + type envParams struct { dealStream *tut.TestStorageDealStream startDataTransferError error @@ -446,6 +507,7 @@ type envParams struct { type dealStateParams struct { addFundsCid *cid.Cid + reserveFunds bool fastRetrieval bool } @@ -473,7 +535,9 @@ func makeExecutor(ctx context.Context, if dealParams.addFundsCid != nil { dealState.AddFundsCid = dealParams.addFundsCid } - + if dealParams.reserveFunds { + dealState.FundsReserved = clientDealProposal.Proposal.ClientBalanceRequirement() + } environment := &fakeEnvironment{ node: node, dealStream: envParams.dealStream, @@ -481,6 +545,7 @@ func makeExecutor(ctx context.Context, providerDealState: envParams.providerDealState, getDealStatusErr: envParams.getDealStatusErr, pollingInterval: envParams.pollingInterval, + dealFunds: tut.NewTestDealFunds(), } if environment.pollingInterval == 0 { @@ -551,6 +616,7 @@ type fakeEnvironment struct { providerDealState *storagemarket.ProviderDealState getDealStatusErr error pollingInterval time.Duration + dealFunds *tut.TestDealFunds } type dataTransferParams struct { @@ -593,6 +659,10 @@ func (fe *fakeEnvironment) PollingInterval() time.Duration { return fe.pollingInterval } +func (fe *fakeEnvironment) DealFunds() funds.DealFunds { + return fe.dealFunds +} + var _ clientstates.ClientDealEnvironment = &fakeEnvironment{} type responseParams struct { diff --git a/storagemarket/impl/funds/funds.go b/storagemarket/impl/funds/funds.go new file mode 100644 index 00000000..0d1c7d14 --- /dev/null +++ b/storagemarket/impl/funds/funds.go @@ -0,0 +1,107 @@ +package funds + +import ( + "bytes" + "sync" + + "github.com/ipfs/go-datastore" + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" +) + +// DealFunds is used to track funds needed for (possibly multiple) deals in progress +type DealFunds interface { + // returns the current amount tracked + Get() abi.TokenAmount + + // Reserve is used to mark funds as "in-use" for a deal + // returns the new amount tracked + Reserve(amount abi.TokenAmount) (abi.TokenAmount, error) + + // Release releases reserved committed funds back to the available pool + // returns total amount reserved afterwards + Release(amount abi.TokenAmount) (abi.TokenAmount, error) +} + +type dealFundsImpl struct { + lock sync.Mutex + + // cached value + reserved abi.TokenAmount + + key datastore.Key + ds datastore.Batching +} + +func NewDealFunds(ds datastore.Batching, key datastore.Key) (DealFunds, error) { + df := &dealFundsImpl{ + ds: ds, + key: key, + } + + value, err := df.loadReserved() + if err != nil { + return nil, err + } + + df.reserved = value + + return df, nil +} + +func (f *dealFundsImpl) Get() abi.TokenAmount { + return f.reserved +} + +func (f *dealFundsImpl) Reserve(amount abi.TokenAmount) (abi.TokenAmount, error) { + f.lock.Lock() + defer f.lock.Unlock() + + return f.storeReserved(big.Add(f.reserved, amount)) +} + +func (f *dealFundsImpl) Release(amount abi.TokenAmount) (abi.TokenAmount, error) { + f.lock.Lock() + defer f.lock.Unlock() + + return f.storeReserved(big.Sub(f.reserved, amount)) +} + +// loadReserved will try to load our reserved value from the datastore +// if it cannot find our key, it will return zero +func (f *dealFundsImpl) loadReserved() (abi.TokenAmount, error) { + b, err := f.ds.Get(f.key) + if err != nil { + if xerrors.Is(err, datastore.ErrNotFound) { + f.reserved = big.Zero() + return f.reserved, nil + } + return abi.TokenAmount{}, err + } + + var value abi.TokenAmount + if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil { + return abi.TokenAmount{}, err + } + + f.reserved = value + return f.reserved, nil +} + +// stores the new reserved value and returns it +func (f *dealFundsImpl) storeReserved(amount abi.TokenAmount) (abi.TokenAmount, error) { + var buf bytes.Buffer + err := amount.MarshalCBOR(&buf) + if err != nil { + return abi.TokenAmount{}, err + } + + if err := f.ds.Put(f.key, buf.Bytes()); err != nil { + return abi.TokenAmount{}, err + } + + f.reserved = amount + return f.reserved, nil +} diff --git a/storagemarket/impl/funds/funds_test.go b/storagemarket/impl/funds/funds_test.go new file mode 100644 index 00000000..47ff3ff2 --- /dev/null +++ b/storagemarket/impl/funds/funds_test.go @@ -0,0 +1,49 @@ +package funds_test + +import ( + "testing" + + "github.com/ipfs/go-datastore" + dss "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/assert" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" +) + +func TestDealFunds(t *testing.T) { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + key := datastore.NewKey("deal_funds_test") + + f, err := funds.NewDealFunds(ds, key) + assert.NoError(t, err) + + // initializes to zero + assert.Equal(t, f.Get(), big.Zero()) + + // reserve funds and return new total + newAmount, err := f.Reserve(abi.NewTokenAmount(123)) + assert.NoError(t, err) + assert.Equal(t, abi.NewTokenAmount(123), newAmount) + assert.Equal(t, abi.NewTokenAmount(123), f.Get()) + + // reserve more funds and return new total + newAmount, err = f.Reserve(abi.NewTokenAmount(100)) + assert.NoError(t, err) + assert.Equal(t, abi.NewTokenAmount(223), newAmount) + assert.Equal(t, abi.NewTokenAmount(223), f.Get()) + + // release funds and return new total + newAmount, err = f.Release(abi.NewTokenAmount(123)) + assert.NoError(t, err) + assert.Equal(t, abi.NewTokenAmount(100), newAmount) + assert.Equal(t, abi.NewTokenAmount(100), f.Get()) + + // creating new funds will read stored value + f, err = funds.NewDealFunds(ds, key) + assert.NoError(t, err) + assert.Equal(t, abi.NewTokenAmount(100), newAmount) + assert.Equal(t, abi.NewTokenAmount(100), f.Get()) +} diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 479120b9..9b772f79 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" @@ -56,6 +57,7 @@ type Provider struct { pieceStore piecestore.PieceStore conns *connmanager.ConnManager storedAsk StoredAsk + dealFunds funds.DealFunds actor address.Address dataTransfer datatransfer.Manager universalRetrievalEnabled bool @@ -103,6 +105,7 @@ func NewProvider(net network.StorageMarketNetwork, minerAddress address.Address, rt abi.RegisteredSealProof, storedAsk StoredAsk, + dealFunds funds.DealFunds, options ...StorageProviderOption, ) (storagemarket.StorageProvider, error) { carIO := cario.NewCarIO() @@ -118,6 +121,7 @@ func NewProvider(net network.StorageMarketNetwork, pieceStore: pieceStore, conns: connmanager.NewConnManager(), storedAsk: storedAsk, + dealFunds: dealFunds, actor: minerAddress, dataTransfer: dataTransfer, pubSub: pubsub.New(providerDispatcher), @@ -660,6 +664,10 @@ func (p *providerDealEnvironment) RunCustomDecisionLogic(ctx context.Context, de return p.p.customDealDeciderFunc(ctx, deal) } +func (p *providerDealEnvironment) DealFunds() funds.DealFunds { + return p.p.dealFunds +} + var _ providerstates.ProviderDealEnvironment = &providerDealEnvironment{} type providerStoreGetter struct { diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index 6e5bcd58..82605d13 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -156,6 +157,29 @@ var ProviderEvents = fsm.Events{ fsm.Event(storagemarket.ProviderEventRestart). FromMany(storagemarket.StorageDealValidating, storagemarket.StorageDealAcceptWait, storagemarket.StorageDealRejecting).To(storagemarket.StorageDealError). FromAny().ToNoChange(), + + fsm.Event(storagemarket.ProviderEventTrackFundsFailed). + From(storagemarket.StorageDealEnsureProviderFunds).To(storagemarket.StorageDealFailing). + Action(func(deal *storagemarket.MinerDeal, err error) error { + deal.Message = xerrors.Errorf("error tracking deal funds: %w", err).Error() + return nil + }), + fsm.Event(storagemarket.ProviderEventFundsReserved). + From(storagemarket.StorageDealEnsureProviderFunds).ToJustRecord(). + Action(func(deal *storagemarket.MinerDeal, fundsReserved abi.TokenAmount) error { + if deal.FundsReserved.Nil() { + deal.FundsReserved = fundsReserved + } else { + deal.FundsReserved = big.Add(deal.FundsReserved, fundsReserved) + } + return nil + }), + fsm.Event(storagemarket.ProviderEventFundsReleased). + FromMany(storagemarket.StorageDealPublishing, storagemarket.StorageDealFailing).ToJustRecord(). + Action(func(deal *storagemarket.MinerDeal, fundsReleased abi.TokenAmount) error { + deal.FundsReserved = big.Subtract(deal.FundsReserved, fundsReleased) + return nil + }), } // ProviderStateEntryFuncs are the handlers for different states in a storage client diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index 8aad9d3b..0869436e 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -42,6 +43,7 @@ type ProviderDealEnvironment interface { FileStore() filestore.FileStore PieceStore() piecestore.PieceStore RunCustomDecisionLogic(context.Context, storagemarket.MinerDeal) (bool, string, error) + DealFunds() funds.DealFunds } // ProviderStateEntryFunc is the signature for a StateEntryFunc in the provider FSM @@ -171,8 +173,19 @@ func EnsureProviderFunds(ctx fsm.Context, environment ProviderDealEnvironment, d if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err)) } + var requiredFunds abi.TokenAmount + if deal.FundsReserved.Nil() || deal.FundsReserved.IsZero() { + requiredFunds, err = environment.DealFunds().Reserve(deal.Proposal.ProviderCollateral) + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventTrackFundsFailed, xerrors.Errorf("tracking deal funds: %w", err)) + } + } else { + requiredFunds = environment.DealFunds().Get() + } + + _ = ctx.Trigger(storagemarket.ProviderEventFundsReserved, deal.Proposal.ProviderCollateral) - mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok) + mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, requiredFunds, tok) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err)) @@ -234,6 +247,15 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err)) } + if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { + _, err = environment.DealFunds().Release(deal.FundsReserved) + if err != nil { + // nonfatal error + log.Warnf("failed to release funds from local tracker: %s", err) + } + _ = ctx.Trigger(storagemarket.ProviderEventFundsReleased, deal.FundsReserved) + } + return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) }) } @@ -419,5 +441,14 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage log.Warnf("deleting store id %d: %w", *deal.StoreID, err) } } + if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { + _, err := environment.DealFunds().Release(deal.FundsReserved) + if err != nil { + // nonfatal error + log.Warnf("failed to release funds from local tracker: %s", err) + } + _ = ctx.Trigger(storagemarket.ProviderEventFundsReleased, deal.FundsReserved) + } + return ctx.Trigger(storagemarket.ProviderEventFailed) } diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 74fc8e94..e17d7ff9 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/xerrors" @@ -29,6 +30,7 @@ import ( tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" @@ -338,6 +340,19 @@ func TestEnsureProviderFunds(t *testing.T) { "succeeds immediately": { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealPublish, deal.State) + require.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) + require.Len(t, env.dealFunds.ReleaseCalls, 0) + require.Equal(t, deal.Proposal.ProviderBalanceRequirement(), deal.FundsReserved) + }, + }, + "succeeds, funds already reserved": { + dealParams: dealParams{ + ReserveFunds: true, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealPublish, deal.State) + require.Len(t, env.dealFunds.ReserveCalls, 0) + require.Len(t, env.dealFunds.ReleaseCalls, 0) }, }, "succeeds by sending an AddBalance message": { @@ -350,6 +365,8 @@ func TestEnsureProviderFunds(t *testing.T) { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealProviderFunding, deal.State) require.Equal(t, &cids[0], deal.AddFundsCid) + require.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) + require.Len(t, env.dealFunds.ReleaseCalls, 0) }, }, "get miner worker fails": { @@ -368,6 +385,8 @@ func TestEnsureProviderFunds(t *testing.T) { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error calling node: ensuring funds: not enough funds", deal.Message) + require.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) + require.Len(t, env.dealFunds.ReleaseCalls, 0) }, }, } @@ -429,12 +448,27 @@ func TestWaitForPublish(t *testing.T) { dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) }{ "succeeds": { + dealParams: dealParams{ + ReserveFunds: true, + }, + nodeParams: nodeParams{ + WaitForMessageRetBytes: psdReturnBytes, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealStaged, deal.State) + require.Equal(t, expDealID, deal.DealID) + assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ProviderBalanceRequirement()) + assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) + }, + }, + "succeeds, funds already released": { nodeParams: nodeParams{ WaitForMessageRetBytes: psdReturnBytes, }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealStaged, deal.State) require.Equal(t, expDealID, deal.DealID) + assert.Len(t, env.dealFunds.ReleaseCalls, 0) }, }, "PublishStorageDeal errors": { @@ -773,6 +807,16 @@ func TestFailDeal(t *testing.T) { tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) }, }, + "succeeds, funds released": { + dealParams: dealParams{ + ReserveFunds: true, + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) + assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ProviderBalanceRequirement()) + assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) + }, + }, "succeeds, file deletions": { dealParams: dealParams{ PiecePath: defaultPath, @@ -808,7 +852,7 @@ var defaultClientAddress = address.TestAddress var defaultProviderAddress = address.TestAddress2 var defaultMinerAddr, _ = address.NewActorAddress([]byte("miner")) var defaultClientCollateral = abi.NewTokenAmount(0) -var defaultProviderCollateral = abi.NewTokenAmount(0) +var defaultProviderCollateral = abi.NewTokenAmount(10000) var defaultDataRef = storagemarket.DataRef{ Root: tut.GenerateCids(1)[0], TransferType: storagemarket.TTGraphsync, @@ -889,6 +933,7 @@ type dealParams struct { EndEpoch abi.ChainEpoch FastRetrieval bool VerifiedDeal bool + ReserveFunds bool } type environmentParams struct { @@ -1028,6 +1073,10 @@ func makeExecutor(ctx context.Context, dealState.DealID = dealParams.DealID } dealState.FastRetrieval = dealParams.FastRetrieval + if dealParams.ReserveFunds { + dealState.FundsReserved = proposal.ProviderCollateral + } + fs := tut.NewTestFileStore(fileStoreParams) pieceStore := tut.NewTestPieceStoreWithParams(pieceStoreParams) expectedTags := make(map[string]struct{}) @@ -1053,6 +1102,7 @@ func makeExecutor(ctx context.Context, deleteStoreError: params.DeleteStoreError, fs: fs, pieceStore: pieceStore, + dealFunds: tut.NewTestDealFunds(), } if environment.pieceCid == cid.Undef { environment.pieceCid = defaultPieceCid @@ -1102,6 +1152,7 @@ type fakeEnvironment struct { pieceStore piecestore.PieceStore expectedTags map[string]struct{} receivedTags map[string]struct{} + dealFunds *tut.TestDealFunds } func (fe *fakeEnvironment) Address() address.Address { @@ -1148,3 +1199,7 @@ func (fe *fakeEnvironment) PieceStore() piecestore.PieceStore { func (fe *fakeEnvironment) RunCustomDecisionLogic(context.Context, storagemarket.MinerDeal) (bool, string, error) { return !fe.rejectDeal, fe.rejectReason, fe.decisionError } + +func (fe *fakeEnvironment) DealFunds() funds.DealFunds { + return fe.dealFunds +} diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 8dc02a6f..8ec7c7d9 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -36,6 +36,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" "github.com/filecoin-project/go-fil-markets/storagemarket/network" @@ -413,6 +414,8 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut require.NoError(t, err) rv1 := requestvalidation.NewUnifiedRequestValidator(nil, statestore.New(td.Ds1)) require.NoError(t, dt1.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, rv1)) + clientDealFunds, err := funds.NewDealFunds(td.Ds1, datastore.NewKey("storage/client/dealfunds")) + require.NoError(t, err) client, err := storageimpl.NewClient( network.NewFromLibp2pHost(td.Host1), @@ -422,6 +425,7 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut discovery.NewLocal(td.Ds1), td.Ds1, &clientNode, + clientDealFunds, storageimpl.DealPollingInterval(0), ) require.NoError(t, err) @@ -437,6 +441,9 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut storedAsk, err := storedask.NewStoredAsk(td.Ds2, datastore.NewKey("latest-ask"), providerNode, providerAddr) assert.NoError(t, err) + providerDealFunds, err := funds.NewDealFunds(td.Ds2, datastore.NewKey("storage/provider/dealfunds")) + assert.NoError(t, err) + provider, err := storageimpl.NewProvider( network.NewFromLibp2pHost(td.Host2), td.Ds2, @@ -448,6 +455,7 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut providerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, storedAsk, + providerDealFunds, ) assert.NoError(t, err) diff --git a/storagemarket/types.go b/storagemarket/types.go index 710e6af2..3059b6b8 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -91,6 +91,7 @@ type MinerDeal struct { FastRetrieval bool Message string StoreID *multistore.StoreID + FundsReserved abi.TokenAmount Ref *DataRef DealID abi.DealID @@ -113,6 +114,7 @@ type ClientDeal struct { PollErrorCount uint64 FastRetrieval bool StoreID *multistore.StoreID + FundsReserved abi.TokenAmount } // StorageDeal is a local combination of a proposal and a current deal state diff --git a/storagemarket/types_cbor_gen.go b/storagemarket/types_cbor_gen.go index 0e40cc5e..b0eb8fd8 100644 --- a/storagemarket/types_cbor_gen.go +++ b/storagemarket/types_cbor_gen.go @@ -18,7 +18,7 @@ import ( var _ = xerrors.Errorf -var lengthBufClientDeal = []byte{143} +var lengthBufClientDeal = []byte{144} func (t *ClientDeal) MarshalCBOR(w io.Writer) error { if t == nil { @@ -152,6 +152,10 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { } } + // t.FundsReserved (big.Int) (struct) + if err := t.FundsReserved.MarshalCBOR(w); err != nil { + return err + } return nil } @@ -169,7 +173,7 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 15 { + if extra != 16 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -415,11 +419,20 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { t.StoreID = &typed } + } + // t.FundsReserved (big.Int) (struct) + + { + + if err := t.FundsReserved.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.FundsReserved: %w", err) + } + } return nil } -var lengthBufMinerDeal = []byte{143} +var lengthBufMinerDeal = []byte{144} func (t *MinerDeal) MarshalCBOR(w io.Writer) error { if t == nil { @@ -561,6 +574,11 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { } } + // t.FundsReserved (big.Int) (struct) + if err := t.FundsReserved.MarshalCBOR(w); err != nil { + return err + } + // t.Ref (storagemarket.DataRef) (struct) if err := t.Ref.MarshalCBOR(w); err != nil { return err @@ -589,7 +607,7 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 15 { + if extra != 16 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -793,6 +811,15 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { t.StoreID = &typed } + } + // t.FundsReserved (big.Int) (struct) + + { + + if err := t.FundsReserved.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.FundsReserved: %w", err) + } + } // t.Ref (storagemarket.DataRef) (struct)