Skip to content

Commit

Permalink
fix: ask sign & storegeprovider response error
Browse files Browse the repository at this point in the history
  • Loading branch information
ta0li committed Nov 9, 2021
1 parent 67e8f5d commit 5e66edc
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 40 deletions.
4 changes: 0 additions & 4 deletions cli/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,6 @@ var setAskCmd = &cli.Command{
return nil
}

// TODO: 判断miner在矿池中是否存在

ssize, err := api.ActorSectorSize(ctx, maddr)
if err != nil {
return err
Expand Down Expand Up @@ -283,8 +281,6 @@ var getAskCmd = &cli.Command{
return nil
}

// TODO: 判断miner在矿池中是否存在

sask, err := smapi.MarketGetAsk(ctx, maddr)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/filecoin-project/venus-auth/log"
"io"
"os"
"sort"
Expand Down Expand Up @@ -52,7 +53,6 @@ import (

"github.com/filecoin-project/venus-market/config"
"github.com/filecoin-project/venus-market/imports"
"github.com/filecoin-project/venus-market/paychmgr"
"github.com/filecoin-project/venus-market/retrievaladapter"
"github.com/filecoin-project/venus-market/storageadapter"
types2 "github.com/filecoin-project/venus-market/types"
Expand All @@ -76,7 +76,6 @@ type API struct {
fx.In

Full apiface.FullNode
PayChManager *paychmgr.Manager

SMDealClient storagemarket.StorageClient
RetDiscovery discovery.PeerResolver
Expand Down Expand Up @@ -1078,6 +1077,7 @@ func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Addre
}

info := utils.NewStorageProviderInfo(miner, mi.Worker, mi.SectorSize, p, mi.Multiaddrs)
log.Infof("info: %v", info)
ask, err := a.SMDealClient.GetAsk(ctx, info)
if err != nil {
return nil, err
Expand Down
13 changes: 8 additions & 5 deletions minermgr/minermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net/http"
"strings"
"sync"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -93,11 +94,13 @@ func (m *MinerMgrImpl) GetMinerFromVenusAuth(ctx context.Context, skip, limit in
m.lk.Lock()
m.miners = make([]address.Address, 0)
for _, val := range res {
addr, err := address.NewFromString(val.Miner)
if err == nil {
m.miners = append(m.miners, addr)
} else {
log.Errorf("miner [%s] is error", val.Miner)
if strings.Index(val.Miner, "f") == 0 || strings.Index(val.Miner, "t") == 0 {
addr, err := address.NewFromString(val.Miner)
if err == nil {
m.miners = append(m.miners, addr)
} else {
log.Errorf("miner [%s] is error", val.Miner)
}
}
}
m.lk.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions models/badger/storage_ask.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (ar *storageAskRepo) GetAsk(miner address.Address) (*storagemarket.SignedSt
key := statestore.ToKey(miner)
b, err := ar.ds.Get(key)
if err != nil {
if err == datastore.ErrNotFound {
return nil, nil
}
return nil, err
}

Expand Down
26 changes: 25 additions & 1 deletion models/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func NewClientTransferDS(ds repo.MetadataDS) repo.ClientTransferDS {
return namespace.Wrap(ds, datastore.NewKey(clientTransfer))
}

// TODO: 这里没有考虑client和server的数据表是不一样的
var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option {
if server {
commonOpts := builder.Options(
Expand Down Expand Up @@ -171,10 +172,33 @@ var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option {
return repo.FundRepo()
}))
} else {
var opts builder.Option
if len(mysqlCfg.ConnectionString) > 0 {
opts = builder.Override(new(repo.Repo), func() (repo.Repo, error) {
return mysql.InitMysql(mysqlCfg)
})
} else {
opts = builder.Options(
builder.Override(new(repo.PieceInfoDS), NewPieceInfoDs),
builder.Override(new(repo.CIDInfoDS), NewCidInfoDs),
builder.Override(new(repo.RetrievalProviderDS), NewRetrievalProviderDS),
builder.Override(new(repo.RetrievalAskDS), NewRetrievalAskDS),
builder.Override(new(repo.ProviderDealDS), NewProviderDealDS),
builder.Override(new(repo.StorageAskDS), NewStorageAskDS),
builder.Override(new(repo.PayChanDS), NewPayChanDS),
builder.Override(new(repo.FundMgrDS), NewFundMgrDS),
builder.Override(new(repo.Repo), func(fundDS repo.FundMgrDS, dealDS repo.ProviderDealDS,
paychDS repo.PayChanDS, askDS repo.StorageAskDS, retrAskDs repo.RetrievalAskDS,
pieceDs repo.PieceInfoDS, cidInfoDs repo.CIDInfoDS, retrievalDs repo.RetrievalProviderDS) (repo.Repo, error) {
return badger_models.NewBadgerRepo(fundDS, dealDS, paychDS, askDS, retrAskDs, cidInfoDs, retrievalDs)
}),
)
}
return builder.Options(
opts,
builder.Override(new(repo.PieceMetaDs), NewPieceMetaDs),
builder.Override(new(repo.MetadataDS), NewMetadataDS),
builder.Override(new(repo.FundMgrDS), NewFundMgrDS),
builder.Override(new(repo.PayChanDS), NewPayChanDS),
builder.Override(new(repo.ClientDatastore), NewClientDatastore),
builder.Override(new(ClientBlockstore), NewClientBlockstore),
builder.Override(new(repo.ClientDealsDS), NewClientDealsDS),
Expand Down
23 changes: 16 additions & 7 deletions storageadapter/deal_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package storageadapter

import (
"context"
"github.com/filecoin-project/go-state-types/exitcode"
"io"
"os"

"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
Expand All @@ -25,10 +25,13 @@ import (
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"

"github.com/filecoin-project/venus/pkg/wallet"

minermgr2 "github.com/filecoin-project/venus-market/minermgr"
"github.com/filecoin-project/venus-market/models/repo"
network2 "github.com/filecoin-project/venus-market/network"
Expand Down Expand Up @@ -231,8 +234,8 @@ func (storageDealPorcess *StorageDealProcessImpl) AcceptDeal(ctx context.Context

// TODO: RunCustomDecisionLogic ?

// Send intent to accept
err = storageDealPorcess.SendSignedResponse(ctx, &network.Response{
// DecideOnProposal
err = storageDealPorcess.SendSignedResponse(ctx, proposal.Provider, &network.Response{
State: storagemarket.StorageDealWaitingForData,
Proposal: minerDeal.ProposalCid,
})
Expand Down Expand Up @@ -602,13 +605,18 @@ func (storageDealPorcess *StorageDealProcessImpl) savePieceFile(ctx context.Cont
return nil
}

func (storageDealPorcess *StorageDealProcessImpl) SendSignedResponse(ctx context.Context, resp *network.Response) error {
func (storageDealPorcess *StorageDealProcessImpl) SendSignedResponse(ctx context.Context, mAddr address.Address, resp *network.Response) error {
s, err := storageDealPorcess.conns.DealStream(resp.Proposal)
if err != nil {
return xerrors.Errorf("couldn't send response: %w", err)
}

sig, err := storageDealPorcess.spn.Sign(ctx, resp)
respEx := &types.SignInfo{
Data: resp,
Type: wallet.MTUnknown,
Addr: mAddr,
}
sig, err := storageDealPorcess.spn.Sign(ctx, respEx)
if err != nil {
return xerrors.Errorf("failed to sign response message: %w", err)
}
Expand All @@ -618,7 +626,8 @@ func (storageDealPorcess *StorageDealProcessImpl) SendSignedResponse(ctx context
Signature: sig,
}

err = s.WriteDealResponse(signedResponse, storageDealPorcess.spn.Sign)
// TODO: review ???
err = s.WriteDealResponse(signedResponse, storageDealPorcess.spn.SignWithGivenMiner(mAddr))
if err != nil {
// Assume client disconnected
_ = storageDealPorcess.conns.Disconnect(resp.Proposal)
Expand All @@ -631,7 +640,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleReject(deal *types.Miner
deal.State = event
deal.Message = err.Error()

err = storageDealPorcess.SendSignedResponse(context.TODO(), &network.Response{
err = storageDealPorcess.SendSignedResponse(context.TODO(), deal.Proposal.Provider, &network.Response{
State: storagemarket.StorageDealFailing,
Message: deal.Message,
Proposal: deal.ProposalCid,
Expand Down
47 changes: 46 additions & 1 deletion storageadapter/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package storageadapter

import (
"context"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"
Expand All @@ -13,6 +14,7 @@ import (
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
Expand Down Expand Up @@ -134,6 +136,14 @@ func (n *ProviderNodeAdapter) Sign(ctx context.Context, data interface{}) (*cryp
if err != nil {
return nil, xerrors.Errorf("couldn't get chain head: %w", err)
}

switch data.(type) {
case *types2.SignInfo:

default:
return nil, xerrors.Errorf("data type is not SignInfo")
}

info := data.(*types2.SignInfo)
msgBytes, err := cborutil.Dump(info.Data)
if err != nil {
Expand All @@ -158,6 +168,38 @@ func (n *ProviderNodeAdapter) Sign(ctx context.Context, data interface{}) (*cryp
return localSignature, nil
}

//
func (n *ProviderNodeAdapter) SignWithGivenMiner(mAddr address.Address) network.ResigningFunc {
return func(ctx context.Context, data interface{}) (*crypto.Signature, error) {
tok, _, err := n.GetChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("couldn't get chain head: %w", err)
}

msgBytes, err := cborutil.Dump(data)
if err != nil {
return nil, xerrors.Errorf("serializing: %w", err)
}

worker, err := n.GetMinerWorkerAddress(ctx, mAddr, tok)
if err != nil {
return nil, err
}

signer, err := n.StateAccountKey(ctx, worker, types.EmptyTSK)
if err != nil {
return nil, err
}
localSignature, err := n.WalletSign(ctx, signer, msgBytes, wallet.MsgMeta{
Type: wallet.MTUnknown,
})
if err != nil {
return nil, err
}
return localSignature, nil
}
}

func (n *ProviderNodeAdapter) ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return n.fundMgr.Reserve(ctx, wallet, addr, amt)
}
Expand Down Expand Up @@ -400,9 +442,12 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID

// StorageProviderNode are common interfaces provided by a filecoin Node to both StorageClient and StorageProvider
type StorageProviderNode interface {
// SignsBytes signs the given data with the given address's private key
// Sign sign the given data with the given address's private key
Sign(ctx context.Context, data interface{}) (*crypto.Signature, error)

// SignWithGivenMiner sign the data with the worker address of the given miner
SignWithGivenMiner(mAddr address.Address) nwk2.ResigningFunc

// GetChainHead returns a tipset token for the current chain head
GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error)

Expand Down
24 changes: 20 additions & 4 deletions storageadapter/storage_ask.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package storageadapter
import (
"context"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/venus-market/metrics"
"github.com/filecoin-project/venus-market/models/repo"

"github.com/filecoin-project/venus/app/client/apiface"
"github.com/filecoin-project/venus/pkg/wallet"
"golang.org/x/xerrors"
)

type IStorageAsk interface {
Expand Down Expand Up @@ -89,9 +93,21 @@ func (repo *StorageAsk) signAsk(ask *storagemarket.StorageAsk) (*storagemarket.S
if err != nil {
return nil, err
}
sig, err := repo.fullNode.WalletSign(context.TODO(), ask.Miner, nil, wallet.MsgMeta{

// get worker address for miner
ctx := context.TODO()
tok, err := repo.fullNode.ChainHead(ctx)
if err != nil {
return nil, err
}

mi, err := repo.fullNode.StateMinerInfo(ctx, ask.Miner, tok.Key())
if err != nil {
return nil, err
}

sig, err := repo.fullNode.WalletSign(ctx, mi.Worker, askBytes, wallet.MsgMeta{
Type: wallet.MTStorageAsk,
Extra: askBytes,
})
if err != nil {
return nil, err
Expand Down
20 changes: 4 additions & 16 deletions storageadapter/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (storageDealStream *StorageDealStream) HandleAskStream(s network.StorageAsk
Ask: ask,
}

if err := s.WriteAskResponse(resp, storageDealStream.spn.Sign); err != nil {
if err := s.WriteAskResponse(resp, storageDealStream.spn.SignWithGivenMiner(ar.Miner)); err != nil {
log.Errorf("failed to write ask response: %s", err)
return
}
Expand Down Expand Up @@ -151,18 +151,6 @@ func (storageDealStream *StorageDealStream) HandleDealStream(s network.StorageDe
CreationTime: curTime(),
InboundCAR: path,
}

// 3. 判断deal是否存在
md, err = storageDealStream.deals.GetDeal(deal.ProposalCid)
if err != nil {
log.Errorf("failed to check if state for %v exists: %w", deal.ProposalCid, err)
return
}
if md != nil {
log.Errorf("deal `%v` that already exists", deal.ProposalCid)
return
}

err = storageDealStream.deals.SaveDeal(deal)
if err != nil {
log.Errorf("save miner deal to database %w", err)
Expand Down Expand Up @@ -236,7 +224,7 @@ func (storageDealStream *StorageDealStream) HandleDealStatusStream(s network.Dea
Signature: *signature,
}

if err := s.WriteDealStatusResponse(response, storageDealStream.spn.Sign); err != nil {
if err := s.WriteDealStatusResponse(response, storageDealStream.spn.SignWithGivenMiner(mAddr)); err != nil {
log.Warnf("failed to write deal status response: %s", err)
return
}
Expand All @@ -247,13 +235,13 @@ func (storageDealStream *StorageDealStream) resendProposalResponse(s network.Sto
sig, err := storageDealStream.spn.Sign(context.TODO(), &types.SignInfo{
Data: resp,
Type: wallet.MTUnknown,
Addr: address.Address{},
Addr: md.Proposal.Provider,
})
if err != nil {
return xerrors.Errorf("failed to sign response message: %w", err)
}

return s.WriteDealResponse(network.SignedResponse{Response: *resp, Signature: sig}, storageDealStream.spn.Sign)
return s.WriteDealResponse(network.SignedResponse{Response: *resp, Signature: sig}, storageDealStream.spn.SignWithGivenMiner(md.Proposal.Provider))
}

func (storageDealStream *StorageDealStream) processDealStatusRequest(ctx context.Context, request *network.DealStatusRequest) (*storagemarket.ProviderDealState, address.Address, error) {
Expand Down

0 comments on commit 5e66edc

Please sign in to comment.