diff --git a/markets/loggers/loggers.go b/markets/loggers/loggers.go index 87c8dfe65a7..e5f669f2f5c 100644 --- a/markets/loggers/loggers.go +++ b/markets/loggers/loggers.go @@ -12,22 +12,22 @@ var log = logging.Logger("markets") // StorageClientLogger logs events from the storage client func StorageClientLogger(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { - log.Infow("storage event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) + log.Infow("storage client event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) } // StorageProviderLogger logs events from the storage provider func StorageProviderLogger(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { - log.Infow("storage event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) + log.Infow("storage provider event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) } // RetrievalClientLogger logs events from the retrieval client func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) { - log.Infow("retrieval event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) + log.Infow("retrieval client event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) } // RetrievalProviderLogger logs events from the retrieval provider func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) { - log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) + log.Infow("retrieval provider event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) } // DataTransferLogger logs events from the data transfer module diff --git a/markets/storageadapter/api.go b/markets/storageadapter/api.go new file mode 100644 index 00000000000..9d89c7aa402 --- /dev/null +++ b/markets/storageadapter/api.go @@ -0,0 +1,53 @@ +package storageadapter + +import ( + "context" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors/adt" + + "github.com/filecoin-project/lotus/api/apibstore" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" +) + +type apiWrapper struct { + api interface { + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) + ChainHasObj(context.Context, cid.Cid) (bool, error) + } +} + +func (ca *apiWrapper) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { + store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(ca.api))) + + preAct, err := ca.api.StateGetActor(ctx, actor, pre) + if err != nil { + return nil, xerrors.Errorf("getting pre actor: %w", err) + } + curAct, err := ca.api.StateGetActor(ctx, actor, cur) + if err != nil { + return nil, xerrors.Errorf("getting cur actor: %w", err) + } + + preSt, err := miner.Load(store, preAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + curSt, err := miner.Load(store, curAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + + diff, err := miner.DiffPreCommits(preSt, curSt) + if err != nil { + return nil, xerrors.Errorf("diff precommits: %w", err) + } + + return diff, err +} diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 88a50931a18..f3491da4756 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -34,9 +34,8 @@ import ( ) type ClientNodeAdapter struct { - full.StateAPI - full.ChainAPI - full.MpoolAPI + *clientApi + *apiWrapper fundmgr *market.FundManager ev *events.Events @@ -46,14 +45,14 @@ type ClientNodeAdapter struct { type clientApi struct { full.ChainAPI full.StateAPI + full.MpoolAPI } func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode { - capi := &clientApi{chain, stateapi} + capi := &clientApi{chain, stateapi, mpool} return &ClientNodeAdapter{ - StateAPI: stateapi, - ChainAPI: chain, - MpoolAPI: mpool, + clientApi: capi, + apiWrapper: &apiWrapper{api: capi}, fundmgr: fundmgr, ev: events.NewEvents(context.TODO(), capi), diff --git a/markets/storageadapter/getcurrentdealinfo.go b/markets/storageadapter/getcurrentdealinfo.go index ab8c3f52fc6..97311a0b284 100644 --- a/markets/storageadapter/getcurrentdealinfo.go +++ b/markets/storageadapter/getcurrentdealinfo.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -18,47 +19,49 @@ type getCurrentDealInfoAPI interface { StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) + + diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) } // GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed -func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) { +func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) { marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key()) if dealErr == nil { equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if equal { - return dealID, marketDeal, nil + return dealID, marketDeal, types.EmptyTSK, nil } dealErr = xerrors.Errorf("Deal proposals did not match") } if publishCid == nil { - return dealID, nil, dealErr + return dealID, nil, types.EmptyTSK, dealErr } // attempt deal id correction lookup, err := api.StateSearchMsg(ctx, *publishCid) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if lookup.Receipt.ExitCode != exitcode.Ok { - return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) + return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) } var retval market.PublishStorageDealsReturn if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { - return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) + return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) } if len(retval.IDs) != 1 { // market currently only ever sends messages with 1 deal - return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") + return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") } if retval.IDs[0] == dealID { // DealID did not change, so we are stuck with the original lookup error - return dealID, nil, dealErr + return dealID, nil, lookup.TipSet, dealErr } dealID = retval.IDs[0] @@ -67,13 +70,13 @@ func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDea if err == nil { equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if !equal { - return dealID, nil, xerrors.Errorf("Deal proposals did not match") + return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match") } } - return dealID, marketDeal, err + return dealID, marketDeal, lookup.TipSet, err } func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) { diff --git a/markets/storageadapter/getcurrentdealinfo_test.go b/markets/storageadapter/getcurrentdealinfo_test.go index ed5d36c5b8c..5e3c10495d0 100644 --- a/markets/storageadapter/getcurrentdealinfo_test.go +++ b/markets/storageadapter/getcurrentdealinfo_test.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" test "github.com/filecoin-project/lotus/chain/events/state/mock" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" @@ -209,7 +210,7 @@ func TestGetCurrentDealInfo(t *testing.T) { MarketDeals: marketDeals, } - dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) + dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) require.Equal(t, data.expectedDealID, dealID) require.Equal(t, data.expectedMarketDeal, marketDeal) if data.expectedError == nil { @@ -236,6 +237,10 @@ type mockGetCurrentDealInfoAPI struct { MarketDeals map[marketDealKey]*api.MarketDeal } +func (mapi *mockGetCurrentDealInfoAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { + return &miner.PreCommitChanges{}, nil +} + func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) { deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}] if !ok { diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index bd59da7503f..5466c81ef4a 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -5,16 +5,18 @@ import ( "context" "sync" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" ) type sectorCommittedEventsAPI interface { @@ -32,7 +34,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -46,6 +48,36 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return true, false, nil } + // Check that precommits which landed between when the deal was published + // and now don't already contain the deal we care about. + // (this can happen when the precommit lands vary quickly (in tests), or + // when the client node was down after the deal was published, and when + // the precommit containing it landed on chain) + + if publishTs == types.EmptyTSK { + lookup, err := api.StateSearchMsg(ctx, *publishCid) + if err != nil { + return false, false, err + } + if lookup != nil { // can be nil in tests + publishTs = lookup.TipSet + } + } + + diff, err := api.diffPreCommits(ctx, provider, publishTs, ts.Key()) + if err != nil { + return false, false, err + } + + for _, info := range diff.Added { + for _, d := range info.Info.DealIDs { + if d == di { + cb(info.Info.SectorNumber, false, nil) + return true, false, nil + } + } + } + // Not yet active, start matching against incoming messages return false, true, nil } @@ -88,7 +120,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // When the deal is published, the deal ID may change, so get the // current deal ID from the publish message CID - dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { return false, err } @@ -130,7 +162,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + _, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -186,7 +218,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } // Get the deal info - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + _, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { return false, xerrors.Errorf("failed to look up deal on chain: %w", err) } @@ -216,22 +248,22 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) +func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) { + di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { // TODO: This may be fine for some errors - return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Sector with deal is already active if sd.State.SectorStartEpoch > 0 { - return true, nil + return 0, true, publishTs, nil } // Sector was slashed if sd.State.SlashEpoch > 0 { - return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) } - return false, nil + return di, false, publishTs, nil } diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index b74a1e53213..dea1f89d2e9 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -161,8 +161,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) { deals: map[abi.DealID]*api.MarketDeal{}, }, }, - expectedCBCallCount: 1, - expectedCBError: errors.New("handling applied event: something went wrong"), + expectedCBCallCount: 0, expectedError: errors.New("failed to set up called handler: something went wrong"), }, "proposed deal epoch timeout": { diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 4ce32d2bfa0..085888ee31a 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -41,6 +41,7 @@ var log = logging.Logger("storageadapter") type ProviderNodeAdapter struct { api.FullNode + *apiWrapper // this goes away with the data transfer module dag dtypes.StagingDAG @@ -55,7 +56,8 @@ type ProviderNodeAdapter struct { func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { na := &ProviderNodeAdapter{ - FullNode: full, + FullNode: full, + apiWrapper: &apiWrapper{api: full}, dag: dag, secb: secb,