Skip to content

Commit

Permalink
feat(requestvalidation): use getsync in validation
Browse files Browse the repository at this point in the history
fix a race condition where deal status in request validation could fall behind actual events
dispatched
  • Loading branch information
hannahhoward committed Aug 14, 2020
1 parent 1e15fa9 commit b60fb2d
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 200 deletions.
6 changes: 0 additions & 6 deletions retrievalmarket/storage_retrieval_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
stormkt "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
stornet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket/testnodes"
Expand Down Expand Up @@ -261,8 +259,6 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness {
require.NoError(t, err)
err = dt1.Start(ctx)
require.NoError(t, err)
rv1 := requestvalidation.NewUnifiedRequestValidator(nil, statestore.New(td.Ds1))
require.NoError(t, dt1.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, rv1))

peerResolver := discovery.NewLocal(td.Ds1)

Expand All @@ -287,8 +283,6 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness {
require.NoError(t, err)
err = dt2.Start(ctx)
require.NoError(t, err)
rv2 := requestvalidation.NewUnifiedRequestValidator(statestore.New(td.Ds2), nil)
require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, rv2))
storedAsk, err := storedask.NewStoredAsk(td.Ds2, datastore.NewKey("latest-ask"), providerNode, providerAddr)
require.NoError(t, err)
providerDealFunds, err := funds.NewDealFunds(td.Ds1, datastore.NewKey("storage/provider/dealfunds"))
Expand Down
64 changes: 5 additions & 59 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -112,6 +110,11 @@ func NewClient(
// register a data transfer event handler -- this will send events to the state machines based on DT events
dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(statemachines))

err = dataTransfer.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, requestvalidation.NewUnifiedRequestValidator(nil, &clientPullDeals{c}))
if err != nil {
return nil, err
}

err = dataTransfer.RegisterTransportConfigurer(&requestvalidation.StorageDataTransferVoucher{}, dtutils.TransportConfigurer(&clientStoreGetter{c}))
if err != nil {
return nil, err
Expand Down Expand Up @@ -561,63 +564,6 @@ func clientDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
return nil
}

// -------
// clientDealEnvironment
// -------

type clientDealEnvironment struct {
c *Client
}

func (c *clientDealEnvironment) NewDealStream(ctx context.Context, p peer.ID) (network.StorageDealStream, error) {
return c.c.net.NewDealStream(ctx, p)
}

func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode {
return c.c.node
}

func (c *clientDealEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error {
_, err := c.c.dataTransfer.OpenPushDataChannel(ctx, to, voucher, baseCid, selector)
return err
}

func (c *clientDealEnvironment) GetProviderDealState(ctx context.Context, proposalCid cid.Cid) (*storagemarket.ProviderDealState, error) {
return c.c.GetProviderDealState(ctx, proposalCid)
}

func (c *clientDealEnvironment) PollingInterval() time.Duration {
return c.c.pollingInterval
}

func (c *clientDealEnvironment) DealFunds() funds.DealFunds {
return c.c.dealFunds
}

type clientStoreGetter struct {
c *Client
}

func (csg *clientStoreGetter) Get(proposalCid cid.Cid) (*multistore.Store, error) {
var deal storagemarket.ClientDeal
err := csg.c.statemachines.Get(proposalCid).Get(&deal)
if err != nil {
return nil, err
}
if deal.StoreID == nil {
return nil, nil
}
return csg.c.multiStore.Get(*deal.StoreID)
}

func (c *clientDealEnvironment) TagPeer(peer peer.ID, tag string) {
c.c.net.TagPeer(peer, tag)
}

func (c *clientDealEnvironment) UntagPeer(peer peer.ID, tag string) {
c.c.net.UntagPeer(peer, tag)
}

// ClientFSMParameterSpec is a valid set of parameters for a client deal FSM - used in doc generation
var ClientFSMParameterSpec = fsm.Parameters{
Environment: &clientDealEnvironment{},
Expand Down
82 changes: 82 additions & 0 deletions storagemarket/impl/client_environments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package storageimpl

import (
"context"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-multistore"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
)

// -------
// clientDealEnvironment
// -------

type clientDealEnvironment struct {
c *Client
}

func (c *clientDealEnvironment) NewDealStream(ctx context.Context, p peer.ID) (network.StorageDealStream, error) {
return c.c.net.NewDealStream(ctx, p)
}

func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode {
return c.c.node
}

func (c *clientDealEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error {
_, err := c.c.dataTransfer.OpenPushDataChannel(ctx, to, voucher, baseCid, selector)
return err
}

func (c *clientDealEnvironment) GetProviderDealState(ctx context.Context, proposalCid cid.Cid) (*storagemarket.ProviderDealState, error) {
return c.c.GetProviderDealState(ctx, proposalCid)
}

func (c *clientDealEnvironment) PollingInterval() time.Duration {
return c.c.pollingInterval
}

func (c *clientDealEnvironment) DealFunds() funds.DealFunds {
return c.c.dealFunds
}

type clientStoreGetter struct {
c *Client
}

func (csg *clientStoreGetter) Get(proposalCid cid.Cid) (*multistore.Store, error) {
var deal storagemarket.ClientDeal
err := csg.c.statemachines.Get(proposalCid).Get(&deal)
if err != nil {
return nil, err
}
if deal.StoreID == nil {
return nil, nil
}
return csg.c.multiStore.Get(*deal.StoreID)
}

func (c *clientDealEnvironment) TagPeer(peer peer.ID, tag string) {
c.c.net.TagPeer(peer, tag)
}

func (c *clientDealEnvironment) UntagPeer(peer peer.ID, tag string) {
c.c.net.UntagPeer(peer, tag)
}

type clientPullDeals struct {
c *Client
}

func (cpd *clientPullDeals) Get(proposalCid cid.Cid) (storagemarket.ClientDeal, error) {
var deal storagemarket.ClientDeal
err := cpd.c.statemachines.GetSync(context.TODO(), proposalCid, &deal)
return deal, err
}
117 changes: 5 additions & 112 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package storageimpl

import (
"context"
"errors"
"io"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -144,6 +141,11 @@ func NewProvider(net network.StorageMarketNetwork,
// register a data transfer event handler -- this will send events to the state machines based on DT events
dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals))

err = dataTransfer.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, requestvalidation.NewUnifiedRequestValidator(&providerPushDeals{h}, nil))
if err != nil {
return nil, err
}

err = dataTransfer.RegisterTransportConfigurer(&requestvalidation.StorageDataTransferVoucher{}, dtutils.TransportConfigurer(&providerStoreGetter{h}))
if err != nil {
return nil, err
Expand Down Expand Up @@ -586,115 +588,6 @@ func providerDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
return nil
}

// -------
// providerDealEnvironment
// -------

type providerDealEnvironment struct {
p *Provider
}

func (p *providerDealEnvironment) Address() address.Address {
return p.p.actor
}

func (p *providerDealEnvironment) Node() storagemarket.StorageProviderNode {
return p.p.spn
}

func (p *providerDealEnvironment) Ask() storagemarket.StorageAsk {
sask := p.p.storedAsk.GetAsk()
if sask == nil {
return storagemarket.StorageAskUndefined
}
return *sask.Ask
}

func (p *providerDealEnvironment) DeleteStore(storeID multistore.StoreID) error {
return p.p.multiStore.Delete(storeID)
}

func (p *providerDealEnvironment) GeneratePieceCommitmentToFile(storeID *multistore.StoreID, payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, filestore.Path, error) {
if p.p.universalRetrievalEnabled {
return providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs, p.p.pio.GeneratePieceCommitmentToFile, p.p.proofType, payloadCid, selector, storeID)
}
pieceCid, piecePath, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector, storeID)
return pieceCid, piecePath, filestore.Path(""), err
}

func (p *providerDealEnvironment) FileStore() filestore.FileStore {
return p.p.fs
}

func (p *providerDealEnvironment) PieceStore() piecestore.PieceStore {
return p.p.pieceStore
}

func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, resp *network.Response) error {
s, err := p.p.conns.DealStream(resp.Proposal)
if err != nil {
return xerrors.Errorf("couldn't send response: %w", err)
}

sig, err := p.p.sign(ctx, resp)
if err != nil {
return xerrors.Errorf("failed to sign response message: %w", err)
}

signedResponse := network.SignedResponse{
Response: *resp,
Signature: sig,
}

err = s.WriteDealResponse(signedResponse)
if err != nil {
// Assume client disconnected
_ = p.p.conns.Disconnect(resp.Proposal)
}
return err
}

func (p *providerDealEnvironment) Disconnect(proposalCid cid.Cid) error {
return p.p.conns.Disconnect(proposalCid)
}

func (p *providerDealEnvironment) RunCustomDecisionLogic(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
if p.p.customDealDeciderFunc == nil {
return true, "", nil
}
return p.p.customDealDeciderFunc(ctx, deal)
}

func (p *providerDealEnvironment) DealFunds() funds.DealFunds {
return p.p.dealFunds
}

func (p *providerDealEnvironment) TagPeer(id peer.ID, s string) {
p.p.net.TagPeer(id, s)
}

func (p *providerDealEnvironment) UntagPeer(id peer.ID, s string) {
p.p.net.UntagPeer(id, s)
}

var _ providerstates.ProviderDealEnvironment = &providerDealEnvironment{}

type providerStoreGetter struct {
p *Provider
}

func (psg *providerStoreGetter) Get(proposalCid cid.Cid) (*multistore.Store, error) {
var deal storagemarket.MinerDeal
err := psg.p.deals.Get(proposalCid).Get(&deal)
if err != nil {
return nil, err
}
if deal.StoreID == nil {
return nil, errors.New("No store for this deal")
}
return psg.p.multiStore.Get(*deal.StoreID)
}

// ProviderFSMParameterSpec is a valid set of parameters for a provider FSM - used in doc generation
var ProviderFSMParameterSpec = fsm.Parameters{
Environment: &providerDealEnvironment{},
Expand Down
Loading

0 comments on commit b60fb2d

Please sign in to comment.