Skip to content

Commit

Permalink
Add more states in Client and Provider FSM representing async ops:
Browse files Browse the repository at this point in the history
- Waiting for storage market funds to appear
- Waiting for deals to be published
  • Loading branch information
ingar committed Apr 21, 2020
1 parent ff4afc5 commit fe29b07
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 242 deletions.
23 changes: 22 additions & 1 deletion storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -277,7 +278,27 @@ func (c *Client) GetPaymentEscrow(ctx context.Context, addr address.Address) (st
}

func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amount abi.TokenAmount) error {
return c.node.AddFunds(ctx, addr, amount)
done := make(chan error)

mcid, err := c.node.AddFunds(ctx, addr, amount)
if err != nil {
return err
}

err = c.node.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
done <- nil
} else {
done <- xerrors.Errorf("AddFunds error, exit code: %w", code)
}
return nil
})

if err != nil {
return err
}

return <-done
}

type clientDealEnvironment struct {
Expand Down
25 changes: 16 additions & 9 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@ import (
// ClientEvents are the events that can happen in a storage client
var ClientEvents = fsm.Events{
fsm.Event(storagemarket.ClientEventOpen).
From(storagemarket.StorageDealUnknown).ToNoChange(),
From(storagemarket.StorageDealUnknown).To(storagemarket.StorageDealEnsureClientFunds),
fsm.Event(storagemarket.ClientEventFundingInitiated).
From(storagemarket.StorageDealEnsureClientFunds).To(storagemarket.StorageDealClientFunding).
Action(func(deal *storagemarket.ClientDeal, mcid cid.Cid) error {
deal.AddFundsCid = mcid
return nil
}),
fsm.Event(storagemarket.ClientEventEnsureFundsFailed).
From(storagemarket.StorageDealUnknown).To(storagemarket.StorageDealFailing).
From(storagemarket.StorageDealClientFunding).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("adding market funds failed: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventFundsEnsured).
From(storagemarket.StorageDealUnknown).To(storagemarket.StorageDealFundsEnsured),
FromMany(storagemarket.StorageDealEnsureClientFunds, storagemarket.StorageDealClientFunding).To(storagemarket.StorageDealFundsEnsured),
fsm.Event(storagemarket.ClientEventWriteProposalFailed).
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
Expand Down Expand Up @@ -97,10 +103,11 @@ var ClientEvents = fsm.Events{

// ClientStateEntryFuncs are the handlers for different states in a storage client
var ClientStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealUnknown: EnsureFunds,
storagemarket.StorageDealFundsEnsured: ProposeDeal,
storagemarket.StorageDealValidating: VerifyDealResponse,
storagemarket.StorageDealProposalAccepted: ValidateDealPublished,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealFailing: FailDeal,
storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds,
storagemarket.StorageDealClientFunding: WaitForFunding,
storagemarket.StorageDealFundsEnsured: ProposeDeal,
storagemarket.StorageDealValidating: VerifyDealResponse,
storagemarket.StorageDealProposalAccepted: ValidateDealPublished,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealFailing: FailDeal,
}
36 changes: 29 additions & 7 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package clientstates

import (
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
Expand All @@ -24,19 +26,39 @@ type ClientDealEnvironment interface {
// ClientStateEntryFunc is the type for all state entry functions on a storage client
type ClientStateEntryFunc func(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error

// EnsureFunds attempts to ensure the client has enough funds for the deal being proposed
func EnsureFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
tok, _, err := environment.Node().GetChainHead(ctx.Context())
// EnsureClientFunds attempts to ensure the client has enough funds for the deal being proposed
func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
node := environment.Node()

tok, _, err := node.GetChainHead(ctx.Context())
if err != nil {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err)
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("acquiring chain head: %w", err))
}

if err := environment.Node().EnsureFunds(
ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement(), tok); err != nil {
mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement(), tok)

if err != nil {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err)
}

return ctx.Trigger(storagemarket.ClientEventFundsEnsured)
// if no message was sent, and there was no error, funds were already available
if mcid == cid.Undef {
return ctx.Trigger(storagemarket.ClientEventFundsEnsured)
}

return ctx.Trigger(storagemarket.ClientEventFundingInitiated, mcid)
}

// WaitForFunding waits for an AddFunds message to appear on the chain
func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
node := environment.Node()

return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
return ctx.Trigger(storagemarket.ClientEventFundsEnsured)
}
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %w", code))
})
}

// ProposeDeal sends the deal proposal to the provider
Expand Down
72 changes: 36 additions & 36 deletions storagemarket/impl/clientstates/cliest_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestEnsureFunds(t *testing.T) {
eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents)
require.NoError(t, err)
clientDealProposal := tut.MakeTestClientDealProposal()
runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureFunds, storagemarket.StorageDealUnknown, clientDealProposal)
runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.EnsureClientFunds, storagemarket.StorageDealClientFunding, clientDealProposal)

node := func(ensureFundsErr error) storagemarket.StorageClientNode {
return &testnodes.FakeClientNode{
Expand All @@ -36,16 +36,16 @@ func TestEnsureFunds(t *testing.T) {
},
}
}
t.Run("EnsureFunds succeeds", func(t *testing.T) {
t.Run("EnsureClientFunds succeeds", func(t *testing.T) {
runEnsureFunds(t, node(nil), nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealFundsEnsured)
require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State)
})
})

t.Run("EnsureFunds fails", func(t *testing.T) {
t.Run("EnsureClientFunds fails", func(t *testing.T) {
runEnsureFunds(t, node(errors.New("Something went wrong")), nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealFailing)
require.Equal(t, deal.Message, "adding market funds failed: Something went wrong")
require.Equal(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "adding market funds failed: Something went wrong", deal.Message)
})
})
}
Expand Down Expand Up @@ -73,21 +73,21 @@ func TestProposeDeal(t *testing.T) {

t.Run("succeeds", func(t *testing.T) {
runProposeDeal(t, node(), nil, dealStream(tut.TrivialStorageDealProposalWriter), nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealValidating)
require.Equal(t, storagemarket.StorageDealValidating, deal.State)
})
})

t.Run("deal stream lookup fails", func(t *testing.T) {
runProposeDeal(t, node(), errors.New("deal stream not found"), nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealFailing)
require.Equal(t, deal.Message, "miner connection error: deal stream not found")
require.Equal(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "miner connection error: deal stream not found", deal.Message)
})
})

t.Run("write proposal fails fails", func(t *testing.T) {
runProposeDeal(t, node(), nil, dealStream(tut.FailStorageProposalWriter), nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, deal.Message, "sending proposal to storage provider failed: write proposal failed")
require.Equal(t, storagemarket.StorageDealError, deal.State)
require.Equal(t, "sending proposal to storage provider failed: write proposal failed", deal.Message)
})
})
}
Expand Down Expand Up @@ -128,23 +128,23 @@ func TestVerifyResponse(t *testing.T) {
Signature: tut.MakeTestSignature(),
}))
runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealProposalAccepted)
require.Equal(t, deal.PublishMessage, publishMessage)
require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State)
require.Equal(t, publishMessage, deal.PublishMessage)
})
})

t.Run("deal stream lookup fails", func(t *testing.T) {
dealStreamErr := errors.New("deal stream not found")
runVerifyResponse(t, node(false), dealStreamErr, dealStream(nil), nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealFailing)
require.Equal(t, deal.Message, "miner connection error: deal stream not found")
require.Equal(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "miner connection error: deal stream not found", deal.Message)
})
})

t.Run("read response fails", func(t *testing.T) {
runVerifyResponse(t, node(false), nil, dealStream(tut.FailStorageResponseReader), nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, deal.Message, "error reading Response message: read response failed")
require.Equal(t, storagemarket.StorageDealError, deal.State)
require.Equal(t, "error reading Response message: read response failed", deal.Message)
})
})

Expand All @@ -159,8 +159,8 @@ func TestVerifyResponse(t *testing.T) {
}))
failToVerifyNode := node(true)
runVerifyResponse(t, failToVerifyNode, nil, stream, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealFailing)
require.Equal(t, deal.Message, "unable to verify signature on deal response")
require.Equal(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "unable to verify signature on deal response", deal.Message)
})
})

Expand All @@ -174,7 +174,7 @@ func TestVerifyResponse(t *testing.T) {
Signature: tut.MakeTestSignature(),
}))
runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealFailing)
require.Equal(t, storagemarket.StorageDealFailing, deal.State)
require.Regexp(t, "^miner responded to a wrong proposal:", deal.Message)
})
})
Expand All @@ -191,8 +191,8 @@ func TestVerifyResponse(t *testing.T) {
}))
expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected)
runVerifyResponse(t, node(false), nil, stream, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealFailing)
require.Equal(t, expErr, deal.Message)
require.Equal(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, deal.Message, expErr)
})
})

Expand All @@ -207,8 +207,8 @@ func TestVerifyResponse(t *testing.T) {
}))
closeStreamErr := errors.New("something went wrong")
runVerifyResponse(t, node(false), nil, stream, closeStreamErr, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, deal.Message, "error attempting to close stream: something went wrong")
require.Equal(t, storagemarket.StorageDealError, deal.State)
require.Equal(t, "error attempting to close stream: something went wrong", deal.Message)
})
})

Expand All @@ -233,15 +233,15 @@ func TestValidateDealPublished(t *testing.T) {

t.Run("succeeds", func(t *testing.T) {
runValidateDealPublished(t, node(abi.DealID(5), nil), nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealSealing)
require.Equal(t, deal.DealID, abi.DealID(5))
require.Equal(t, storagemarket.StorageDealSealing, deal.State)
require.Equal(t, abi.DealID(5), deal.DealID)
})
})

t.Run("fails", func(t *testing.T) {
runValidateDealPublished(t, node(abi.DealID(5), errors.New("Something went wrong")), nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, deal.Message, "error validating deal published: Something went wrong")
require.Equal(t, storagemarket.StorageDealError, deal.State)
require.Equal(t, "error validating deal published: Something went wrong", deal.Message)
})
})
}
Expand All @@ -265,21 +265,21 @@ func TestVerifyDealActivated(t *testing.T) {

t.Run("succeeds", func(t *testing.T) {
runVerifyDealActivated(t, node(nil, nil), nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealActive)
require.Equal(t, storagemarket.StorageDealActive, deal.State)
})
})

t.Run("fails synchronously", func(t *testing.T) {
runVerifyDealActivated(t, node(errors.New("Something went wrong"), nil), nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, deal.Message, "error in deal activation: Something went wrong")
require.Equal(t, storagemarket.StorageDealError, deal.State)
require.Equal(t, "error in deal activation: Something went wrong", deal.Message)
})
})

t.Run("fails asynchronously", func(t *testing.T) {
runVerifyDealActivated(t, node(nil, errors.New("Something went wrong later")), nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, deal.Message, "error in deal activation: Something went wrong later")
require.Equal(t, storagemarket.StorageDealError, deal.State)
require.Equal(t, "error in deal activation: Something went wrong later", deal.Message)
})
})
}
Expand All @@ -293,14 +293,14 @@ func TestFailDeal(t *testing.T) {

t.Run("able to close stream", func(t *testing.T) {
runFailDeal(t, nil, nil, nil, nil, func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, storagemarket.StorageDealError, deal.State)
})
})

t.Run("unable to close stream", func(t *testing.T) {
runFailDeal(t, nil, nil, nil, errors.New("unable to close"), func(deal storagemarket.ClientDeal) {
require.Equal(t, deal.State, storagemarket.StorageDealError)
require.Equal(t, deal.Message, "error attempting to close stream: unable to close")
require.Equal(t, storagemarket.StorageDealError, deal.State)
require.Equal(t, "error attempting to close stream: unable to close", deal.Message)
})
})
}
Expand Down
23 changes: 22 additions & 1 deletion storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand Down Expand Up @@ -242,7 +243,27 @@ func (p *Provider) ListDeals(ctx context.Context) ([]storagemarket.StorageDeal,
}

func (p *Provider) AddStorageCollateral(ctx context.Context, amount abi.TokenAmount) error {
return p.spn.AddFunds(ctx, p.actor, amount)
done := make(chan error)

mcid, err := p.spn.AddFunds(ctx, p.actor, amount)
if err != nil {
return err
}

err = p.spn.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
done <- nil
} else {
done <- xerrors.Errorf("AddFunds error, exit code: %w", code)
}
return nil
})

if err != nil {
return err
}

return <-done
}

func (p *Provider) GetStorageCollateral(ctx context.Context) (storagemarket.Balance, error) {
Expand Down
Loading

0 comments on commit fe29b07

Please sign in to comment.