Skip to content

Commit

Permalink
feat: introduce deal proposal protocol v1.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Feb 15, 2023
1 parent 29dc7d3 commit 851f86e
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 13 deletions.
39 changes: 30 additions & 9 deletions storagemarket/deal_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"runtime/debug"
"time"

"github.com/filecoin-project/boost/storagemarket/types"
Expand Down Expand Up @@ -80,7 +81,17 @@ func (p *Provider) runDeal(deal *types.ProviderDealState, dh *dealHandler) {
p.saveDealToDB(dh.Publisher, deal)
}

func (p *Provider) execDeal(deal *smtypes.ProviderDealState, dh *dealHandler) *dealMakingError {
func (p *Provider) execDeal(deal *smtypes.ProviderDealState, dh *dealHandler) (dmerr *dealMakingError) {
// Capture any panic as a manually retryable error
defer func() {
if err := recover(); err != nil {
dmerr = &dealMakingError{
error: fmt.Errorf("Caught panic in deal execution: %s\n%s", err, debug.Stack()),
retry: smtypes.DealRetryManual,
}
}
}()

// If the deal has not yet been handed off to the sealer
if deal.Checkpoint < dealcheckpoints.AddedPiece {
transferType := "downloaded file"
Expand Down Expand Up @@ -363,19 +374,29 @@ func (p *Provider) transferAndVerify(dh *dealHandler, pub event.Emitter, deal *s
return p.updateCheckpoint(pub, deal, dealcheckpoints.Transferred)
}

const OneGib = 1024 * 1024 * 1024

func (p *Provider) waitForTransferFinish(ctx context.Context, handler transport.Handler, pub event.Emitter, deal *types.ProviderDealState) error {
defer handler.Close()
defer p.transfers.complete(deal.DealUuid)

// log transfer progress to the deal log every 10%
var lastOutputPct int64
// log transfer progress to the deal log every 10% or every GiB
var lastOutput int64
logTransferProgress := func(received int64) {
pct := (100 * received) / int64(deal.Transfer.Size)
outputPct := pct / 10
if outputPct != lastOutputPct {
lastOutputPct = outputPct
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received,
"deal size", deal.Transfer.Size, "percent complete", pct)
if deal.Transfer.Size > 0 {
pct := (100 * received) / int64(deal.Transfer.Size)
outputPct := pct / 10
if outputPct != lastOutput {
lastOutput = outputPct
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received,
"deal size", deal.Transfer.Size, "percent complete", pct)
}
} else {
gib := received / OneGib
if gib != lastOutput {
lastOutput = gib
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received)
}
}
}

Expand Down
22 changes: 18 additions & 4 deletions storagemarket/lp2pimpl/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
var log = logging.Logger("boost-net")
var propLog = logging.Logger("boost-prop")

const DealProtocolID = "/fil/storage/mk/1.2.0"
const DealProtocolv120ID = "/fil/storage/mk/1.2.0"
const DealProtocolv121ID = "/fil/storage/mk/1.2.1"
const DealStatusV12ProtocolID = "/fil/storage/status/1.2.0"
const providerReadDeadline = 10 * time.Second
const providerWriteDeadline = 10 * time.Second
Expand Down Expand Up @@ -57,7 +58,7 @@ func (c *DealClient) SendDealProposal(ctx context.Context, id peer.ID, params ty
log.Debugw("send deal proposal", "id", params.DealUUID, "provider-peer", id)

// Create a libp2p stream to the provider
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{DealProtocolID})
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{DealProtocolv121ID, DealProtocolv120ID})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,12 +170,25 @@ func NewDealProvider(h host.Host, prov *storagemarket.Provider, fullNodeApi v1ap

func (p *DealProvider) Start(ctx context.Context) {
p.ctx = ctx
p.host.SetStreamHandler(DealProtocolID, p.handleNewDealStream)

// Note that the handling for deal protocol v1.2.0 and v1.2.1 is the same.
// Deal protocol v1.2.1 has a couple of new fields: SkipIPNIAnnounce and
// RemoveUnsealedCopy.
// If a client that supports deal protocol v1.2.0 sends a request to a
// boostd server that supports deal protocol v1.2.1, the DealParams struct
// will be missing these new fields.
// When the DealParams struct is unmarshalled the missing fields will be
// set to false, which maintains the previous behaviour:
// - SkipIPNIAnnounce=false: announce deal to IPNI
// - RemoveUnsealedCopy=false: keep unsealed copy of deal data
p.host.SetStreamHandler(DealProtocolv121ID, p.handleNewDealStream)
p.host.SetStreamHandler(DealProtocolv120ID, p.handleNewDealStream)

p.host.SetStreamHandler(DealStatusV12ProtocolID, p.handleNewDealStatusStream)
}

func (p *DealProvider) Stop() {
p.host.RemoveStreamHandler(DealProtocolID)
p.host.RemoveStreamHandler(DealProtocolv120ID)
p.host.RemoveStreamHandler(DealStatusV12ProtocolID)
}

Expand Down
68 changes: 68 additions & 0 deletions storagemarket/lp2pimpl/net_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package lp2pimpl

import (
"bytes"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/testutil"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"testing"
)

// TestDealParamsMissingFields verifies that when the client sends a v1.2.0
// DealParams struct, the server unmarshalls it into a v1.2.1 DealParams struct
// with all missing boolean fields set to false.
func TestDealParamsMissingFields(t *testing.T) {
label, err := market.NewLabelFromString("label")
require.NoError(t, err)
dpv120 := types.DealParamsV120{
DealUUID: uuid.New(),
IsOffline: false,
ClientDealProposal: market.ClientDealProposal{
Proposal: market.DealProposal{
PieceCID: testutil.GenerateCid(),
Client: address.TestAddress,
Provider: address.TestAddress2,
Label: label,
StoragePricePerEpoch: abi.NewTokenAmount(1),
ProviderCollateral: abi.NewTokenAmount(2),
ClientCollateral: abi.NewTokenAmount(3),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("sig"),
},
},
DealDataRoot: testutil.GenerateCid(),
Transfer: types.Transfer{
Type: "http",
ClientID: "1234",
Params: []byte("params"),
Size: 5678,
},
}

var buff bytes.Buffer
err = dpv120.MarshalCBOR(&buff)
require.NoError(t, err)

var dpv121 types.DealParams
err = dpv121.UnmarshalCBOR(&buff)
require.NoError(t, err)

// Expect all fields present in both v1.2.0 and v1.2.1 to match
require.Equal(t, dpv120.DealUUID, dpv121.DealUUID)
require.Equal(t, dpv120.IsOffline, dpv121.IsOffline)
require.Equal(t, dpv120.ClientDealProposal, dpv121.ClientDealProposal)
require.Equal(t, dpv120.DealDataRoot, dpv121.DealDataRoot)
require.Equal(t, dpv120.Transfer, dpv121.Transfer)

// Expect all boolean fields not present in v1.2.0 to be false
// in v1.2.1
require.False(t, dpv121.SkipIPNIAnnounce)
require.False(t, dpv121.RemoveUnsealedCopy)
}
10 changes: 10 additions & 0 deletions storagemarket/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,16 @@ func TestOfflineDealRestartAfterManualRecoverableErrors(t *testing.T) {
onResume: func(builder *testDealBuilder) *testDeal {
return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build()
},
}, {
name: "deal exec panic",
dbuilder: func(h *ProviderHarness) *testDeal {
// Simulate panic
return h.newDealBuilder(t, 1, withOfflineDeal()).withCommpFailing(errors.New("panic: commp panic")).build()
},
expectedErr: "Caught panic in deal execution: commp panic",
onResume: func(builder *testDealBuilder) *testDeal {
return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build()
},
}}

for _, tc := range tcs {
Expand Down
4 changes: 4 additions & 0 deletions storagemarket/smtestutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"io/ioutil"
"strings"
"sync"

"github.com/filecoin-project/boost/storagemarket/types"
Expand Down Expand Up @@ -167,6 +168,9 @@ func (mb *MinerStubBuilder) SetupCommp(blocking bool) *MinerStubBuilder {

func (mb *MinerStubBuilder) SetupCommpFailure(err error) {
mb.stub.MockCommpCalculator.EXPECT().ComputeDataCid(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
if strings.HasPrefix(err.Error(), "panic: ") {
panic(err.Error()[len("panic: "):])
}
return abi.PieceInfo{}, err
})
}
Expand Down
17 changes: 17 additions & 0 deletions storagemarket/types/legacy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package types

import (
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
)

//go:generate cbor-gen-for --map-encoding DealParamsV120

type DealParamsV120 struct {
DealUUID uuid.UUID
IsOffline bool
ClientDealProposal market.ClientDealProposal
DealDataRoot cid.Cid
Transfer Transfer
}
Loading

0 comments on commit 851f86e

Please sign in to comment.