Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce deal proposal protocol v1.2.1 #1185

Merged
merged 1 commit into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions storagemarket/deal_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"runtime/debug"
"time"

"github.com/filecoin-project/boost/storagemarket/types"
Expand Down Expand Up @@ -80,7 +81,17 @@ func (p *Provider) runDeal(deal *types.ProviderDealState, dh *dealHandler) {
p.saveDealToDB(dh.Publisher, deal)
}

func (p *Provider) execDeal(deal *smtypes.ProviderDealState, dh *dealHandler) *dealMakingError {
func (p *Provider) execDeal(deal *smtypes.ProviderDealState, dh *dealHandler) (dmerr *dealMakingError) {
// Capture any panic as a manually retryable error
defer func() {
if err := recover(); err != nil {
dmerr = &dealMakingError{
error: fmt.Errorf("Caught panic in deal execution: %s\n%s", err, debug.Stack()),
retry: smtypes.DealRetryManual,
}
}
}()

// If the deal has not yet been handed off to the sealer
if deal.Checkpoint < dealcheckpoints.AddedPiece {
transferType := "downloaded file"
Expand Down Expand Up @@ -363,19 +374,29 @@ func (p *Provider) transferAndVerify(dh *dealHandler, pub event.Emitter, deal *s
return p.updateCheckpoint(pub, deal, dealcheckpoints.Transferred)
}

const OneGib = 1024 * 1024 * 1024

func (p *Provider) waitForTransferFinish(ctx context.Context, handler transport.Handler, pub event.Emitter, deal *types.ProviderDealState) error {
defer handler.Close()
defer p.transfers.complete(deal.DealUuid)

// log transfer progress to the deal log every 10%
var lastOutputPct int64
// log transfer progress to the deal log every 10% or every GiB
var lastOutput int64
logTransferProgress := func(received int64) {
pct := (100 * received) / int64(deal.Transfer.Size)
outputPct := pct / 10
if outputPct != lastOutputPct {
lastOutputPct = outputPct
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received,
"deal size", deal.Transfer.Size, "percent complete", pct)
if deal.Transfer.Size > 0 {
pct := (100 * received) / int64(deal.Transfer.Size)
outputPct := pct / 10
if outputPct != lastOutput {
lastOutput = outputPct
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received,
"deal size", deal.Transfer.Size, "percent complete", pct)
}
} else {
gib := received / OneGib
if gib != lastOutput {
lastOutput = gib
p.dealLogger.Infow(deal.DealUuid, "transfer progress", "bytes received", received)
}
}
}

Expand Down
23 changes: 19 additions & 4 deletions storagemarket/lp2pimpl/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
var log = logging.Logger("boost-net")
var propLog = logging.Logger("boost-prop")

const DealProtocolID = "/fil/storage/mk/1.2.0"
const DealProtocolv120ID = "/fil/storage/mk/1.2.0"
const DealProtocolv121ID = "/fil/storage/mk/1.2.1"
const DealStatusV12ProtocolID = "/fil/storage/status/1.2.0"
const providerReadDeadline = 10 * time.Second
const providerWriteDeadline = 10 * time.Second
Expand Down Expand Up @@ -57,7 +58,7 @@ func (c *DealClient) SendDealProposal(ctx context.Context, id peer.ID, params ty
log.Debugw("send deal proposal", "id", params.DealUUID, "provider-peer", id)

// Create a libp2p stream to the provider
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{DealProtocolID})
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{DealProtocolv121ID, DealProtocolv120ID})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,12 +170,26 @@ func NewDealProvider(h host.Host, prov *storagemarket.Provider, fullNodeApi v1ap

func (p *DealProvider) Start(ctx context.Context) {
p.ctx = ctx
p.host.SetStreamHandler(DealProtocolID, p.handleNewDealStream)

// Note that the handling for deal protocol v1.2.0 and v1.2.1 is the same.
// Deal protocol v1.2.1 has a couple of new fields: SkipIPNIAnnounce and
// RemoveUnsealedCopy.
// If a client that supports deal protocol v1.2.0 sends a request to a
// boostd server that supports deal protocol v1.2.1, the DealParams struct
// will be missing these new fields.
// When the DealParams struct is unmarshalled the missing fields will be
// set to false, which maintains the previous behaviour:
// - SkipIPNIAnnounce=false: announce deal to IPNI
// - RemoveUnsealedCopy=false: keep unsealed copy of deal data
p.host.SetStreamHandler(DealProtocolv121ID, p.handleNewDealStream)
p.host.SetStreamHandler(DealProtocolv120ID, p.handleNewDealStream)

p.host.SetStreamHandler(DealStatusV12ProtocolID, p.handleNewDealStatusStream)
}

func (p *DealProvider) Stop() {
p.host.RemoveStreamHandler(DealProtocolID)
p.host.RemoveStreamHandler(DealProtocolv121ID)
p.host.RemoveStreamHandler(DealProtocolv120ID)
p.host.RemoveStreamHandler(DealStatusV12ProtocolID)
}

Expand Down
68 changes: 68 additions & 0 deletions storagemarket/lp2pimpl/net_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package lp2pimpl

import (
"bytes"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/testutil"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"testing"
)

// TestDealParamsMissingFields verifies that when the client sends a v1.2.0
// DealParams struct, the server unmarshalls it into a v1.2.1 DealParams struct
// with all missing boolean fields set to false.
func TestDealParamsMissingFields(t *testing.T) {
label, err := market.NewLabelFromString("label")
require.NoError(t, err)
dpv120 := types.DealParamsV120{
DealUUID: uuid.New(),
IsOffline: false,
ClientDealProposal: market.ClientDealProposal{
Proposal: market.DealProposal{
PieceCID: testutil.GenerateCid(),
Client: address.TestAddress,
Provider: address.TestAddress2,
Label: label,
StoragePricePerEpoch: abi.NewTokenAmount(1),
ProviderCollateral: abi.NewTokenAmount(2),
ClientCollateral: abi.NewTokenAmount(3),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("sig"),
},
},
DealDataRoot: testutil.GenerateCid(),
Transfer: types.Transfer{
Type: "http",
ClientID: "1234",
Params: []byte("params"),
Size: 5678,
},
}

var buff bytes.Buffer
err = dpv120.MarshalCBOR(&buff)
require.NoError(t, err)

var dpv121 types.DealParams
err = dpv121.UnmarshalCBOR(&buff)
require.NoError(t, err)

// Expect all fields present in both v1.2.0 and v1.2.1 to match
require.Equal(t, dpv120.DealUUID, dpv121.DealUUID)
require.Equal(t, dpv120.IsOffline, dpv121.IsOffline)
require.Equal(t, dpv120.ClientDealProposal, dpv121.ClientDealProposal)
require.Equal(t, dpv120.DealDataRoot, dpv121.DealDataRoot)
require.Equal(t, dpv120.Transfer, dpv121.Transfer)

// Expect all boolean fields not present in v1.2.0 to be false
// in v1.2.1
require.False(t, dpv121.SkipIPNIAnnounce)
require.False(t, dpv121.RemoveUnsealedCopy)
}
10 changes: 10 additions & 0 deletions storagemarket/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,16 @@ func TestOfflineDealRestartAfterManualRecoverableErrors(t *testing.T) {
onResume: func(builder *testDealBuilder) *testDeal {
return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build()
},
}, {
name: "deal exec panic",
dbuilder: func(h *ProviderHarness) *testDeal {
// Simulate panic
return h.newDealBuilder(t, 1, withOfflineDeal()).withCommpFailing(errors.New("panic: commp panic")).build()
},
expectedErr: "Caught panic in deal execution: commp panic",
onResume: func(builder *testDealBuilder) *testDeal {
return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build()
},
}}

for _, tc := range tcs {
Expand Down
4 changes: 4 additions & 0 deletions storagemarket/smtestutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"io/ioutil"
"strings"
"sync"

"github.com/filecoin-project/boost/storagemarket/types"
Expand Down Expand Up @@ -167,6 +168,9 @@ func (mb *MinerStubBuilder) SetupCommp(blocking bool) *MinerStubBuilder {

func (mb *MinerStubBuilder) SetupCommpFailure(err error) {
mb.stub.MockCommpCalculator.EXPECT().ComputeDataCid(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
if strings.HasPrefix(err.Error(), "panic: ") {
panic(err.Error()[len("panic: "):])
}
return abi.PieceInfo{}, err
})
}
Expand Down
15 changes: 15 additions & 0 deletions storagemarket/types/legacy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package types

import (
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
)

type DealParamsV120 struct {
DealUUID uuid.UUID
IsOffline bool
ClientDealProposal market.ClientDealProposal
DealDataRoot cid.Cid
Transfer Transfer
}
2 changes: 1 addition & 1 deletion storagemarket/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/ipfs/go-cid"
)

//go:generate cbor-gen-for --map-encoding StorageAsk DealParams Transfer DealResponse DealStatusRequest DealStatusResponse DealStatus
//go:generate cbor-gen-for --map-encoding StorageAsk DealParamsV120 DealParams Transfer DealResponse DealStatusRequest DealStatusResponse DealStatus
//go:generate go run github.com/golang/mock/mockgen -destination=mock_types/mocks.go -package=mock_types . PieceAdder,CommpCalculator,DealPublisher,ChainDealManager,IndexProvider

// StorageAsk defines the parameters by which a miner will choose to accept or
Expand Down
Loading