From 1b494acc9e28473f52119799e7da7964718e2f16 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 25 Jan 2021 11:28:39 +0100 Subject: [PATCH] feat: handle batch publish storage deals message in sealing recovery --- cmd/lotus-storage-miner/init.go | 3 +- extern/storage-sealing/cbor_gen.go | 39 ++- extern/storage-sealing/checks.go | 2 +- extern/storage-sealing/currentdealinfo.go | 209 ++++++++++++ .../storage-sealing/currentdealinfo_test.go | 310 ++++++++++++++++++ extern/storage-sealing/sealing.go | 8 +- extern/storage-sealing/states_failed.go | 28 +- extern/storage-sealing/types.go | 2 + extern/storage-sealing/types_test.go | 37 ++- markets/storageadapter/provider.go | 5 +- storage/adapter_storage_miner.go | 24 +- storage/miner.go | 1 + 12 files changed, 631 insertions(+), 37 deletions(-) create mode 100644 extern/storage-sealing/currentdealinfo.go create mode 100644 extern/storage-sealing/currentdealinfo_test.go diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 008b2ea157e..ae0447489b3 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -311,7 +311,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string, PieceCID: commD, }, DealInfo: &sealing.DealInfo{ - DealID: dealID, + DealID: dealID, + DealProposal: §or.Deal, DealSchedule: sealing.DealSchedule{ StartEpoch: sector.Deal.StartEpoch, EndEpoch: sector.Deal.EndEpoch, diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 70be08ace37..282ff06dc52 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -7,6 +7,7 @@ import ( "io" abi "github.com/filecoin-project/go-state-types/abi" + market "github.com/filecoin-project/specs-actors/actors/builtin/market" miner "github.com/filecoin-project/specs-actors/actors/builtin/miner" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" @@ -135,7 +136,7 @@ func (t *DealInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{164}); err != nil { + if _, err := w.Write([]byte{165}); err != nil { return err } @@ -179,6 +180,22 @@ func (t *DealInfo) MarshalCBOR(w io.Writer) error { return err } + // t.DealProposal (market.DealProposal) (struct) + if len("DealProposal") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"DealProposal\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealProposal"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("DealProposal")); err != nil { + return err + } + + if err := t.DealProposal.MarshalCBOR(w); err != nil { + return err + } + // t.DealSchedule (sealing.DealSchedule) (struct) if len("DealSchedule") > cbg.MaxLength { return xerrors.Errorf("Value in field \"DealSchedule\" was too long") @@ -283,6 +300,26 @@ func (t *DealInfo) UnmarshalCBOR(r io.Reader) error { } t.DealID = abi.DealID(extra) + } + // t.DealProposal (market.DealProposal) (struct) + case "DealProposal": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + t.DealProposal = new(market.DealProposal) + if err := t.DealProposal.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) + } + } + } // t.DealSchedule (sealing.DealSchedule) (struct) case "DealSchedule": diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 249163d6660..db2e87d0e04 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -53,7 +53,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api continue } - proposal, err := api.StateMarketStorageDeal(ctx, p.DealInfo.DealID, tok) + proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok) if err != nil { return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)} } diff --git a/extern/storage-sealing/currentdealinfo.go b/extern/storage-sealing/currentdealinfo.go new file mode 100644 index 00000000000..105a42d1ef4 --- /dev/null +++ b/extern/storage-sealing/currentdealinfo.go @@ -0,0 +1,209 @@ +package sealing + +import ( + "bytes" + "context" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/types" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type CurrentDealInfoAPI interface { + ChainGetMessage(context.Context, cid.Cid) (*types.Message, error) + StateLookupID(context.Context, address.Address, TipSetToken) (address.Address, error) + StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error) + StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error) +} + +type CurrentDealInfo struct { + DealID abi.DealID + MarketDeal *api.MarketDeal + PublishMsgTipSet TipSetToken +} + +type CurrentDealInfoManager struct { + CDAPI CurrentDealInfoAPI +} + +// GetCurrentDealInfo gets the current deal state and deal ID. +// Note that the deal ID is assigned when the deal is published, so it may +// have changed if there was a reorg after the deal was published. +func (mgr *CurrentDealInfoManager) GetCurrentDealInfo(ctx context.Context, tok TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (CurrentDealInfo, error) { + // Lookup the deal ID by comparing the deal proposal to the proposals in + // the publish deals message, and indexing into the message return value + dealID, pubMsgTok, err := mgr.dealIDFromPublishDealsMsg(ctx, tok, proposal, publishCid) + if err != nil { + return CurrentDealInfo{}, err + } + + // Lookup the deal state by deal ID + marketDeal, err := mgr.CDAPI.StateMarketStorageDeal(ctx, dealID, tok) + if err == nil && proposal != nil { + // Make sure the retrieved deal proposal matches the target proposal + equal, err := mgr.CheckDealEquality(ctx, tok, *proposal, marketDeal.Proposal) + if err != nil { + return CurrentDealInfo{}, err + } + if !equal { + return CurrentDealInfo{}, xerrors.Errorf("Deal proposals for publish message %s did not match", publishCid) + } + } + return CurrentDealInfo{DealID: dealID, MarketDeal: marketDeal, PublishMsgTipSet: pubMsgTok}, err +} + +// dealIDFromPublishDealsMsg looks up the publish deals message by cid, and finds the deal ID +// by looking at the message return value +func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context, tok TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (abi.DealID, TipSetToken, error) { + dealID := abi.DealID(0) + + // Get the return value of the publish deals message + lookup, err := mgr.CDAPI.StateSearchMsg(ctx, publishCid) + if err != nil { + return dealID, nil, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err) + } + + if lookup.Receipt.ExitCode != exitcode.Ok { + return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode) + } + + var retval market.PublishStorageDealsReturn + if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { + return dealID, nil, xerrors.Errorf("looking for publish deal message %s: unmarshalling message return: %w", publishCid, err) + } + + // Previously, publish deals messages contained a single deal, and the + // deal proposal was not included in the sealing deal info. + // So check if the proposal is nil and check the number of deals published + // in the message. + if proposal == nil { + if len(retval.IDs) > 1 { + return dealID, nil, xerrors.Errorf( + "getting deal ID from publish deal message %s: "+ + "no deal proposal supplied but message return value has more than one deal (%d deals)", + publishCid, len(retval.IDs)) + } + + // There is a single deal in this publish message and no deal proposal + // was supplied, so we have nothing to compare against. Just assume + // the deal ID is correct. + return retval.IDs[0], lookup.TipSetTok, nil + } + + // Get the parameters to the publish deals message + pubmsg, err := mgr.CDAPI.ChainGetMessage(ctx, publishCid) + if err != nil { + return dealID, nil, xerrors.Errorf("getting publish deal message %s: %w", publishCid, err) + } + + var pubDealsParams market2.PublishStorageDealsParams + if err := pubDealsParams.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil { + return dealID, nil, xerrors.Errorf("unmarshalling publish deal message params for message %s: %w", publishCid, err) + } + + // Scan through the deal proposals in the message parameters to find the + // index of the target deal proposal + dealIdx := -1 + for i, paramDeal := range pubDealsParams.Deals { + eq, err := mgr.CheckDealEquality(ctx, tok, *proposal, market.DealProposal(paramDeal.Proposal)) + if err != nil { + return dealID, nil, xerrors.Errorf("comparing publish deal message %s proposal to deal proposal: %w", publishCid, err) + } + if eq { + dealIdx = i + break + } + } + + if dealIdx == -1 { + return dealID, nil, xerrors.Errorf("could not find deal in publish deals message %s", publishCid) + } + + if dealIdx >= len(retval.IDs) { + return dealID, nil, xerrors.Errorf( + "deal index %d out of bounds of deals (len %d) in publish deals message %s", + dealIdx, len(retval.IDs), publishCid) + } + + return retval.IDs[dealIdx], lookup.TipSetTok, nil +} + +func (mgr *CurrentDealInfoManager) CheckDealEquality(ctx context.Context, tok TipSetToken, p1, p2 market.DealProposal) (bool, error) { + p1ClientID, err := mgr.CDAPI.StateLookupID(ctx, p1.Client, tok) + if err != nil { + return false, err + } + p2ClientID, err := mgr.CDAPI.StateLookupID(ctx, p2.Client, tok) + if err != nil { + return false, err + } + return p1.PieceCID.Equals(p2.PieceCID) && + p1.PieceSize == p2.PieceSize && + p1.VerifiedDeal == p2.VerifiedDeal && + p1.Label == p2.Label && + p1.StartEpoch == p2.StartEpoch && + p1.EndEpoch == p2.EndEpoch && + p1.StoragePricePerEpoch.Equals(p2.StoragePricePerEpoch) && + p1.ProviderCollateral.Equals(p2.ProviderCollateral) && + p1.ClientCollateral.Equals(p2.ClientCollateral) && + p1.Provider == p2.Provider && + p1ClientID == p2ClientID, nil +} + +type CurrentDealInfoTskAPI interface { + ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) + StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) +} + +type CurrentDealInfoAPIAdapter struct { + CurrentDealInfoTskAPI +} + +func (c *CurrentDealInfoAPIAdapter) StateLookupID(ctx context.Context, a address.Address, tok TipSetToken) (address.Address, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return address.Undef, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) + } + + return c.CurrentDealInfoTskAPI.StateLookupID(ctx, a, tsk) +} + +func (c *CurrentDealInfoAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok TipSetToken) (*api.MarketDeal, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) + } + + return c.CurrentDealInfoTskAPI.StateMarketStorageDeal(ctx, dealID, tsk) +} + +func (c *CurrentDealInfoAPIAdapter) StateSearchMsg(ctx context.Context, k cid.Cid) (*MsgLookup, error) { + wmsg, err := c.CurrentDealInfoTskAPI.StateSearchMsg(ctx, k) + if err != nil { + return nil, err + } + + if wmsg == nil { + return nil, nil + } + + return &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: wmsg.Receipt.ExitCode, + Return: wmsg.Receipt.Return, + GasUsed: wmsg.Receipt.GasUsed, + }, + TipSetTok: wmsg.TipSet.Bytes(), + Height: wmsg.Height, + }, nil +} + +var _ CurrentDealInfoAPI = (*CurrentDealInfoAPIAdapter)(nil) diff --git a/extern/storage-sealing/currentdealinfo_test.go b/extern/storage-sealing/currentdealinfo_test.go new file mode 100644 index 00000000000..ee51d8c75db --- /dev/null +++ b/extern/storage-sealing/currentdealinfo_test.go @@ -0,0 +1,310 @@ +package sealing + +import ( + "bytes" + "errors" + "math/rand" + "sort" + "testing" + "time" + + "golang.org/x/net/context" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + evtmock "github.com/filecoin-project/lotus/chain/events/state/mock" + "github.com/filecoin-project/lotus/chain/types" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" +) + +var errNotFound = errors.New("Could not find") + +func TestGetCurrentDealInfo(t *testing.T) { + ctx := context.Background() + dummyCid, _ := cid.Parse("bafkqaaa") + dummyCid2, _ := cid.Parse("bafkqaab") + zeroDealID := abi.DealID(0) + earlierDealID := abi.DealID(9) + successDealID := abi.DealID(10) + proposal := market.DealProposal{ + PieceCID: dummyCid, + PieceSize: abi.PaddedPieceSize(100), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "success", + } + otherProposal := market.DealProposal{ + PieceCID: dummyCid2, + PieceSize: abi.PaddedPieceSize(100), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "other", + } + successDeal := &api.MarketDeal{ + Proposal: proposal, + State: market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + }, + } + earlierDeal := &api.MarketDeal{ + Proposal: otherProposal, + State: market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + }, + } + + type testCaseData struct { + searchMessageLookup *MsgLookup + searchMessageErr error + marketDeals map[abi.DealID]*api.MarketDeal + publishCid cid.Cid + targetProposal *market.DealProposal + expectedDealID abi.DealID + expectedMarketDeal *api.MarketDeal + expectedError error + } + testCases := map[string]testCaseData{ + "deal lookup succeeds": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{successDealID}), + }, + }, + marketDeals: map[abi.DealID]*api.MarketDeal{ + successDealID: successDeal, + }, + targetProposal: &proposal, + expectedDealID: successDealID, + expectedMarketDeal: successDeal, + }, + "deal lookup succeeds two return values": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{earlierDealID, successDealID}), + }, + }, + marketDeals: map[abi.DealID]*api.MarketDeal{ + earlierDealID: earlierDeal, + successDealID: successDeal, + }, + targetProposal: &proposal, + expectedDealID: successDealID, + expectedMarketDeal: successDeal, + }, + "deal lookup fails proposal mis-match": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{earlierDealID}), + }, + }, + marketDeals: map[abi.DealID]*api.MarketDeal{ + earlierDealID: earlierDeal, + }, + targetProposal: &proposal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("could not find deal in publish deals message %s", dummyCid), + }, + "deal lookup fails mismatch count of deals and return values": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{earlierDealID}), + }, + }, + marketDeals: map[abi.DealID]*api.MarketDeal{ + earlierDealID: earlierDeal, + successDealID: successDeal, + }, + targetProposal: &proposal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("deal index 1 out of bounds of deals (len 1) in publish deals message %s", dummyCid), + }, + "deal lookup succeeds, target proposal nil, single deal in message": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{successDealID}), + }, + }, + marketDeals: map[abi.DealID]*api.MarketDeal{ + successDealID: successDeal, + }, + targetProposal: nil, + expectedDealID: successDealID, + expectedMarketDeal: successDeal, + }, + "deal lookup fails, multiple deals in return value but target proposal nil": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{earlierDealID, successDealID}), + }, + }, + marketDeals: map[abi.DealID]*api.MarketDeal{ + earlierDealID: earlierDeal, + successDealID: successDeal, + }, + targetProposal: nil, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("getting deal ID from publish deal message %s: no deal proposal supplied but message return value has more than one deal (2 deals)", dummyCid), + }, + "search message fails": { + publishCid: dummyCid, + searchMessageErr: errors.New("something went wrong"), + targetProposal: &proposal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("looking for publish deal message %s: search msg failed: something went wrong", dummyCid), + }, + "return code not ok": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.ErrIllegalState, + }, + }, + targetProposal: &proposal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", dummyCid, exitcode.ErrIllegalState), + }, + "unable to unmarshal params": { + publishCid: dummyCid, + searchMessageLookup: &MsgLookup{ + Receipt: MessageReceipt{ + ExitCode: exitcode.Ok, + Return: []byte("applesauce"), + }, + }, + targetProposal: &proposal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("looking for publish deal message %s: unmarshalling message return: cbor input should be of type array", dummyCid), + }, + } + runTestCase := func(testCase string, data testCaseData) { + t.Run(testCase, func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + ts, err := evtmock.MockTipset(address.TestAddress, rand.Uint64()) + require.NoError(t, err) + marketDeals := make(map[marketDealKey]*api.MarketDeal) + for dealID, deal := range data.marketDeals { + marketDeals[marketDealKey{dealID, ts.Key()}] = deal + } + mockApi := &CurrentDealInfoMockAPI{ + SearchMessageLookup: data.searchMessageLookup, + SearchMessageErr: data.searchMessageErr, + MarketDeals: marketDeals, + } + dealInfoMgr := CurrentDealInfoManager{mockApi} + + res, err := dealInfoMgr.GetCurrentDealInfo(ctx, ts.Key().Bytes(), data.targetProposal, data.publishCid) + require.Equal(t, data.expectedDealID, res.DealID) + require.Equal(t, data.expectedMarketDeal, res.MarketDeal) + if data.expectedError == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, data.expectedError.Error()) + } + }) + } + for testCase, data := range testCases { + runTestCase(testCase, data) + } +} + +type marketDealKey struct { + abi.DealID + types.TipSetKey +} + +type CurrentDealInfoMockAPI struct { + SearchMessageLookup *MsgLookup + SearchMessageErr error + + MarketDeals map[marketDealKey]*api.MarketDeal +} + +func (mapi *CurrentDealInfoMockAPI) ChainGetMessage(ctx context.Context, c cid.Cid) (*types.Message, error) { + var dealIDs []abi.DealID + var deals []market2.ClientDealProposal + for k, dl := range mapi.MarketDeals { + dealIDs = append(dealIDs, k.DealID) + deals = append(deals, market2.ClientDealProposal{ + Proposal: market2.DealProposal(dl.Proposal), + ClientSignature: crypto.Signature{ + Data: []byte("foo bar cat dog"), + Type: crypto.SigTypeBLS, + }, + }) + } + sort.SliceStable(deals, func(i, j int) bool { + return dealIDs[i] < dealIDs[j] + }) + buf := new(bytes.Buffer) + params := market2.PublishStorageDealsParams{Deals: deals} + err := params.MarshalCBOR(buf) + if err != nil { + panic(err) + } + return &types.Message{ + Params: buf.Bytes(), + }, nil +} + +func (mapi *CurrentDealInfoMockAPI) StateLookupID(ctx context.Context, addr address.Address, token TipSetToken) (address.Address, error) { + return addr, nil +} + +func (mapi *CurrentDealInfoMockAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok TipSetToken) (*api.MarketDeal, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, err + } + deal, ok := mapi.MarketDeals[marketDealKey{dealID, tsk}] + if !ok { + return nil, errNotFound + } + return deal, nil +} + +func (mapi *CurrentDealInfoMockAPI) StateSearchMsg(ctx context.Context, c cid.Cid) (*MsgLookup, error) { + if mapi.SearchMessageLookup == nil { + return mapi.SearchMessageLookup, mapi.SearchMessageErr + } + + return mapi.SearchMessageLookup, mapi.SearchMessageErr +} + +func makePublishDealsReturnBytes(t *testing.T, dealIDs []abi.DealID) []byte { + buf := new(bytes.Buffer) + dealsReturn := market.PublishStorageDealsReturn{ + IDs: dealIDs, + } + err := dealsReturn.MarshalCBOR(buf) + require.NoError(t, err) + return buf.Bytes() +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 96d63efdce2..fe79d09ec20 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" ) @@ -53,18 +54,21 @@ type SealingAPI interface { StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error) + StateLookupID(context.Context, address.Address, TipSetToken) (address.Address, error) StateMinerSectorSize(context.Context, address.Address, TipSetToken) (abi.SectorSize, error) StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok TipSetToken) (address.Address, error) StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error) StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error) - StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) + StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error) + StateMarketStorageDealProposal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error) StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) + ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) ChainGetRandomnessFromBeacon(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) ChainGetRandomnessFromTickets(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) @@ -100,6 +104,7 @@ type Sealing struct { terminator *TerminateBatcher getConfig GetSealingConfigFunc + dealInfo *CurrentDealInfoManager } type FeeConfig struct { @@ -145,6 +150,7 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc), getConfig: gc, + dealInfo: &CurrentDealInfoManager{api}, stats: SectorStats{ bySector: map[abi.SectorID]statSectorState{}, diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index e76b7883c11..24008a8039c 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -1,7 +1,6 @@ package sealing import ( - "bytes" "time" "golang.org/x/xerrors" @@ -365,7 +364,7 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn continue } - proposal, err := m.api.StateMarketStorageDeal(ctx.Context(), p.DealInfo.DealID, tok) + proposal, err := m.api.StateMarketStorageDealProposal(ctx.Context(), p.DealInfo.DealID, tok) if err != nil { log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err) toFix = append(toFix, i) @@ -408,26 +407,17 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn return ctx.Send(SectorRemove{}) } - ml, err := m.api.StateSearchMsg(ctx.Context(), *p.DealInfo.PublishCid) - if err != nil { - return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): %w", *p.DealInfo.PublishCid, sector.SectorNumber, i, err) - } - - if ml.Receipt.ExitCode != exitcode.Ok { - return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): non-ok exit code: %s", *p.DealInfo.PublishCid, sector.SectorNumber, i, ml.Receipt.ExitCode) + var dp *market.DealProposal + if p.DealInfo.DealProposal != nil { + mdp := market.DealProposal(*p.DealInfo.DealProposal) + dp = &mdp } - - var retval market.PublishStorageDealsReturn - if err := retval.UnmarshalCBOR(bytes.NewReader(ml.Receipt.Return)); err != nil { - return xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) - } - - if len(retval.IDs) != 1 { - // market currently only ever sends messages with 1 deal - return xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") + res, err := m.dealInfo.GetCurrentDealInfo(ctx.Context(), tok, dp, *p.DealInfo.PublishCid) + if err != nil { + return xerrors.Errorf("recovering deal ID for publish deal message %s (sector %d, piece %d): %w", *p.DealInfo.PublishCid, sector.SectorNumber, i, err) } - updates[i] = retval.IDs[0] + updates[i] = res.DealID } // Not much to do here, we can't go back in time to commit this sector diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 1d507362207..95bf9c303aa 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/miner" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" ) // Piece is a tuple of piece and deal info @@ -32,6 +33,7 @@ type Piece struct { type DealInfo struct { PublishCid *cid.Cid DealID abi.DealID + DealProposal *market.DealProposal DealSchedule DealSchedule KeepUnsealed bool } diff --git a/extern/storage-sealing/types_test.go b/extern/storage-sealing/types_test.go index 0b3c9703212..aa314c37a68 100644 --- a/extern/storage-sealing/types_test.go +++ b/extern/storage-sealing/types_test.go @@ -4,26 +4,41 @@ import ( "bytes" "testing" + "github.com/ipfs/go-cid" + "gotest.tools/assert" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/abi" - builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" ) -func TestSectorInfoSelialization(t *testing.T) { +func TestSectorInfoSerialization(t *testing.T) { d := abi.DealID(1234) + dummyCid, err := cid.Parse("bafkqaaa") + if err != nil { + t.Fatal(err) + } + dealInfo := DealInfo{ DealID: d, DealSchedule: DealSchedule{ StartEpoch: 0, EndEpoch: 100, }, + DealProposal: &market2.DealProposal{ + PieceCID: dummyCid, + PieceSize: 5, + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(10), + ProviderCollateral: abi.NewTokenAmount(20), + ClientCollateral: abi.NewTokenAmount(15), + }, } - dummyCid := builtin2.AccountActorCodeID - si := &SectorInfo{ State: "stateful", SectorNumber: 234, @@ -53,18 +68,18 @@ func TestSectorInfoSelialization(t *testing.T) { } var si2 SectorInfo - if err := cborutil.ReadCborRPC(bytes.NewReader(b), &si); err != nil { + if err := cborutil.ReadCborRPC(bytes.NewReader(b), &si2); err != nil { + t.Fatal(err) return } assert.Equal(t, si.State, si2.State) assert.Equal(t, si.SectorNumber, si2.SectorNumber) - assert.Equal(t, si.Pieces, si2.Pieces) - assert.Equal(t, si.CommD, si2.CommD) - assert.Equal(t, si.TicketValue, si2.TicketValue) + assert.Equal(t, si.Pieces[0].DealInfo.DealID, si2.Pieces[0].DealInfo.DealID) + assert.Equal(t, si.Pieces[0].DealInfo.DealProposal.PieceCID, si2.Pieces[0].DealInfo.DealProposal.PieceCID) + assert.Equal(t, *si.CommD, *si2.CommD) + assert.DeepEqual(t, si.TicketValue, si2.TicketValue) + assert.Equal(t, si.TicketEpoch, si2.TicketEpoch) assert.Equal(t, si.TicketEpoch, si2.TicketEpoch) - - assert.Equal(t, si, si2) - } diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 085888ee31a..0241b8b2d24 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -108,8 +108,9 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema } sdInfo := sealing.DealInfo{ - DealID: deal.DealID, - PublishCid: deal.PublishCid, + DealID: deal.DealID, + DealProposal: &deal.Proposal, + PublishCid: deal.PublishCid, DealSchedule: sealing.DealSchedule{ StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch, EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch, diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 20bf308253d..a648c9fcc4f 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -252,7 +252,25 @@ func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr addre return s.delegate.StateMinerPartitions(ctx, maddr, dlIdx, tsk) } -func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok sealing.TipSetToken) (market.DealProposal, error) { +func (s SealingAPIAdapter) StateLookupID(ctx context.Context, addr address.Address, tok sealing.TipSetToken) (address.Address, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return address.Undef, err + } + + return s.delegate.StateLookupID(ctx, addr, tsk) +} + +func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok sealing.TipSetToken) (*api.MarketDeal, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, err + } + + return s.delegate.StateMarketStorageDeal(ctx, dealID, tsk) +} + +func (s SealingAPIAdapter) StateMarketStorageDealProposal(ctx context.Context, dealID abi.DealID, tok sealing.TipSetToken) (market.DealProposal, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return market.DealProposal{}, err @@ -310,6 +328,10 @@ func (s SealingAPIAdapter) ChainHead(ctx context.Context) (sealing.TipSetToken, return head.Key().Bytes(), head.Height(), nil } +func (s SealingAPIAdapter) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) { + return s.delegate.ChainGetMessage(ctx, mc) +} + func (s SealingAPIAdapter) ChainGetRandomnessFromBeacon(ctx context.Context, tok sealing.TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { diff --git a/storage/miner.go b/storage/miner.go index 752d7ff4218..a43781039fa 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -106,6 +106,7 @@ type storageMinerApi interface { ChainGetRandomnessFromBeacon(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) ChainHasObj(context.Context, cid.Cid) (bool, error) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error)