Skip to content

Commit

Permalink
Handle re-sent deal proposals (#423)
Browse files Browse the repository at this point in the history
* Resend proposal response if we are already tracking a deal and the client resends the proposal.

* Lint fixes
  • Loading branch information
ingar authored Oct 2, 2020
1 parent f7227b2 commit 7911c1c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 0 deletions.
9 changes: 9 additions & 0 deletions shared_testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/jbenet/go-random"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

cborutil "github.com/filecoin-project/go-cbor-util"
versioning "github.com/filecoin-project/go-ds-versioning/pkg"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"

"github.com/filecoin-project/go-fil-markets/storagemarket"
Expand Down Expand Up @@ -114,3 +117,9 @@ func GenerateCid(t *testing.T, o interface{}) cid.Cid {
assert.NoError(t, err)
return node.Cid()
}

func DatastoreAtVersion(t *testing.T, ds datastore.Batching, version versioning.VersionKey) datastore.Batching {
err := ds.Put(datastore.NewKey("/versions/current"), []byte(version))
require.NoError(t, err)
return namespace.Wrap(ds, datastore.NewKey(fmt.Sprintf("/%s", version)))
}
24 changes: 24 additions & 0 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error {
return err
}

// Check if we are already tracking this deal
var md storagemarket.MinerDeal
if err := p.deals.Get(proposalNd.Cid()).Get(&md); err == nil {
// We are already tracking this deal, for some reason it was re-proposed, perhaps because of a client restart
// this is ok, just send a response back.
return p.resendProposalResponse(s, &md)
}

var storeIDForDeal *multistore.StoreID
if proposal.Piece.TransferType != storagemarket.TTManual {
nextStoreID := p.multiStore.Next()
Expand Down Expand Up @@ -581,6 +589,22 @@ func (p *Provider) sign(ctx context.Context, data interface{}) (*crypto.Signatur
return providerutils.SignMinerData(ctx, data, p.actor, tok, p.spn.GetMinerWorkerAddress, p.spn.SignBytes)
}

func (p *Provider) resendProposalResponse(s network.StorageDealStream, md *storagemarket.MinerDeal) error {
resp := &network.Response{State: md.State, Message: md.Message, Proposal: md.ProposalCid}
sig, err := p.sign(context.TODO(), resp)
if err != nil {
return xerrors.Errorf("failed to sign response message: %w", err)
}

err = s.WriteDealResponse(network.SignedResponse{Response: *resp, Signature: sig}, p.sign)

if closeErr := s.Close(); closeErr != nil {
log.Warnf("closing connection: %v", err)
}

return err
}

func newProviderStateMachine(ds datastore.Batching, env fsm.Environment, notifier fsm.Notifier, storageMigrations versioning.VersionedMigrationList, target versioning.VersionKey) (fsm.Group, func(context.Context) error, error) {
return versionedfsm.NewVersionedFSM(ds, fsm.Parameters{
Environment: env,
Expand Down
79 changes: 79 additions & 0 deletions storagemarket/impl/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,82 @@ func TestProvider_Migrations(t *testing.T) {
require.Equal(t, expectedDeal, deal)
}
}

func TestHandleDealStream(t *testing.T) {
t.Run("handles cases where the proposal is already being tracked", func(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
deps := dependencies.NewDependenciesWithTestData(t, ctx, shared_testutil.NewLibp2pTestData(ctx, t), testnodes.NewStorageMarketState(), "", testnodes.DelayFakeCommonNode{})
var providerDs datastore.Batching = namespace.Wrap(deps.TestData.Ds1, datastore.NewKey("/deals/provider"))
namespaced := shared_testutil.DatastoreAtVersion(t, providerDs, "1")

proposal := shared_testutil.MakeTestClientDealProposal()
proposalNd, err := cborutil.AsIpld(proposal)
require.NoError(t, err)
payloadCid := shared_testutil.GenerateCids(1)[0]
dataRef := &storagemarket.DataRef{
TransferType: storagemarket.TTGraphsync,
Root: payloadCid,
}

now := time.Now()
creationTime := cbg.CborTime(time.Unix(0, now.UnixNano()).UTC())
timeBuf := new(bytes.Buffer)
err = creationTime.MarshalCBOR(timeBuf)
require.NoError(t, err)
err = cborutil.ReadCborRPC(timeBuf, &creationTime)
require.NoError(t, err)
deal := storagemarket.MinerDeal{
ClientDealProposal: *proposal,
ProposalCid: proposalNd.Cid(),
State: storagemarket.StorageDealTransferring,
Ref: dataRef,
}

// jam a miner state in
buf := new(bytes.Buffer)
err = deal.MarshalCBOR(buf)
require.NoError(t, err)
err = namespaced.Put(datastore.NewKey(deal.ProposalCid.String()), buf.Bytes())
require.NoError(t, err)

provider, err := storageimpl.NewProvider(
network.NewFromLibp2pHost(deps.TestData.Host2, network.RetryParameters(0, 0, 0)),
providerDs,
deps.Fs,
deps.TestData.MultiStore2,
deps.PieceStore,
deps.DTProvider,
deps.ProviderNode,
deps.ProviderAddr,
abi.RegisteredSealProof_StackedDrg2KiBV1,
deps.StoredAsk,
deps.ProviderDealFunds,
)
require.NoError(t, err)

impl := provider.(*storageimpl.Provider)
shared_testutil.StartAndWaitForReady(ctx, t, impl)

var responseWriteCount int
s := shared_testutil.NewTestStorageDealStream(shared_testutil.TestStorageDealStreamParams{
ProposalReader: func() (network.Proposal, error) {
return network.Proposal{
DealProposal: proposal,
Piece: dataRef,
FastRetrieval: false,
}, nil
},
ResponseWriter: func(response network.SignedResponse, resigningFunc network.ResigningFunc) error {
responseWriteCount += 1
return nil
},
})

// Send a deal proposal for a cid we are already tracking
impl.HandleDealStream(s)

require.Equal(t, 1, responseWriteCount)
})
}

0 comments on commit 7911c1c

Please sign in to comment.