Skip to content

Commit

Permalink
Add DealStages to track and log Deal status updates (#502)
Browse files Browse the repository at this point in the history
* Add DealStages field to track and keep history of lifecycle of deal

* fixup

* fixup

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* decrease log level to debug

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Update storagemarket/impl/clientstates/client_fsm.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* explicit set of log with empty value

* fix test

* fix: dont panic when adding log to nil DealStages

* add godocs to deal stages objects.

Co-authored-by: dirkmc <dirkmdev@gmail.com>
Co-authored-by: raulk <raul@protocol.ai>
  • Loading branch information
3 people authored Mar 30, 2021
1 parent a247678 commit 39a8025
Show file tree
Hide file tree
Showing 8 changed files with 788 additions and 11 deletions.
1 change: 1 addition & 0 deletions shared_testutil/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposa
Miner: p,
MinerWorker: address.TestAddress2,
DataRef: MakeTestDataRef(manualXfer),
DealStages: storagemarket.NewDealStages(),
}, nil
}

Expand Down
67 changes: 67 additions & 0 deletions storagemarket/dealstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,70 @@ var DealStates = map[StorageDealStatus]string{
StorageDealClientTransferRestart: "StorageDealClientTransferRestart",
StorageDealProviderTransferAwaitRestart: "StorageDealProviderTransferAwaitRestart",
}

// DealStatesDescriptions maps StorageDealStatus codes to string description for better UX
var DealStatesDescriptions = map[StorageDealStatus]string{
StorageDealUnknown: "Unknown",
StorageDealProposalNotFound: "Proposal not found",
StorageDealProposalRejected: "Proposal rejected",
StorageDealProposalAccepted: "Proposal accepted",
StorageDealAcceptWait: "AcceptWait",
StorageDealStartDataTransfer: "Starting data transfer",
StorageDealStaged: "Staged",
StorageDealAwaitingPreCommit: "Awaiting a PreCommit message on chain",
StorageDealSealing: "Sealing",
StorageDealActive: "Active",
StorageDealExpired: "Expired",
StorageDealSlashed: "Slashed",
StorageDealRejecting: "Rejecting",
StorageDealFailing: "Failing",
StorageDealFundsReserved: "FundsReserved",
StorageDealCheckForAcceptance: "Checking for deal acceptance",
StorageDealValidating: "Validating",
StorageDealTransferring: "Transferring",
StorageDealWaitingForData: "Waiting for data",
StorageDealVerifyData: "Verifying data",
StorageDealReserveProviderFunds: "Reserving provider funds",
StorageDealReserveClientFunds: "Reserving client funds",
StorageDealProviderFunding: "Provider funding",
StorageDealClientFunding: "Client funding",
StorageDealPublish: "Publish",
StorageDealPublishing: "Publishing",
StorageDealError: "Error",
StorageDealFinalizing: "Finalizing",
StorageDealClientTransferRestart: "Client transfer restart",
StorageDealProviderTransferAwaitRestart: "ProviderTransferAwaitRestart",
}

var DealStatesDurations = map[StorageDealStatus]string{
StorageDealUnknown: "",
StorageDealProposalNotFound: "",
StorageDealProposalRejected: "",
StorageDealProposalAccepted: "a few minutes",
StorageDealAcceptWait: "a few minutes",
StorageDealStartDataTransfer: "a few minutes",
StorageDealStaged: "a few minutes",
StorageDealAwaitingPreCommit: "a few minutes",
StorageDealSealing: "a few hours",
StorageDealActive: "",
StorageDealExpired: "",
StorageDealSlashed: "",
StorageDealRejecting: "",
StorageDealFailing: "",
StorageDealFundsReserved: "a few minutes",
StorageDealCheckForAcceptance: "a few minutes",
StorageDealValidating: "a few minutes",
StorageDealTransferring: "a few minutes",
StorageDealWaitingForData: "a few minutes",
StorageDealVerifyData: "a few minutes",
StorageDealReserveProviderFunds: "a few minutes",
StorageDealReserveClientFunds: "a few minutes",
StorageDealProviderFunding: "a few minutes",
StorageDealClientFunding: "a few minutes",
StorageDealPublish: "a few minutes",
StorageDealPublishing: "a few minutes",
StorageDealError: "",
StorageDealFinalizing: "a few minutes",
StorageDealClientTransferRestart: "depending on data size, anywhere between a few minutes to a few hours",
StorageDealProviderTransferAwaitRestart: "a few minutes",
}
1 change: 1 addition & 0 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (c *Client) ProposeStorageDeal(ctx context.Context, params storagemarket.Pr
DataRef: params.Data,
FastRetrieval: params.FastRetrieval,
StoreID: params.StoreID,
DealStages: storagemarket.NewDealStages(),
CreationTime: curTime(),
}

Expand Down
60 changes: 54 additions & 6 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealReserveClientFunds).To(storagemarket.StorageDealClientFunding).
Action(func(deal *storagemarket.ClientDeal, mcid cid.Cid) error {
deal.AddFundsCid = &mcid
deal.AddLog("reserving funds for storage deal, message cid: <%s>", mcid)
return nil
}),
fsm.Event(storagemarket.ClientEventReserveFundsFailed).
FromMany(storagemarket.StorageDealClientFunding, storagemarket.StorageDealReserveClientFunds).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("adding market funds failed: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventFundsReserved).
Expand All @@ -38,12 +40,14 @@ var ClientEvents = fsm.Events{
} else {
deal.FundsReserved = big.Add(deal.FundsReserved, fundsReserved)
}
deal.AddLog("funds reserved, amount <%s>", fundsReserved)
return nil
}),
fsm.Event(storagemarket.ClientEventFundsReleased).
FromMany(storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealFailing).ToJustRecord().
Action(func(deal *storagemarket.ClientDeal, fundsReleased abi.TokenAmount) error {
deal.FundsReserved = big.Subtract(deal.FundsReserved, fundsReleased)
deal.AddLog("funds released, amount <%s>", fundsReleased)
return nil
}),
fsm.Event(storagemarket.ClientEventFundingComplete).
Expand All @@ -52,48 +56,58 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("sending proposal to storage provider failed: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventReadResponseFailed).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error()
deal.Message = xerrors.Errorf("error reading Response message from provider: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventResponseVerificationFailed).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "unable to verify signature on deal response"
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventInitiateDataTransfer).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealStartDataTransfer),

From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealStartDataTransfer).
Action(func(deal *storagemarket.ClientDeal) error {
deal.AddLog("opening data transfer to storage provider")
return nil
}),
fsm.Event(storagemarket.ClientEventUnexpectedDealState).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, status storagemarket.StorageDealStatus, providerMessage string) error {
deal.Message = xerrors.Errorf("unexpected deal status while waiting for data request: %d (%s). Provider message: %s", status, storagemarket.DealStates[status], providerMessage).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventDataTransferFailed).
FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to complete data transfer: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferRestartFailed).From(storagemarket.StorageDealClientTransferRestart).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to restart data transfer: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferInitiated).
FromMany(storagemarket.StorageDealStartDataTransfer).To(storagemarket.StorageDealTransferring).
Action(func(deal *storagemarket.ClientDeal, channelId datatransfer.ChannelID) error {
deal.TransferChannelID = &channelId
deal.AddLog("data transfer initiated on channel id <%s>", channelId)
return nil
}),

Expand All @@ -103,6 +117,7 @@ var ClientEvents = fsm.Events{
Action(func(deal *storagemarket.ClientDeal, channelId datatransfer.ChannelID) error {
deal.TransferChannelID = &channelId
deal.Message = ""
deal.AddLog("data transfer restarted on channel id <%s>", channelId)
return nil
}),

Expand All @@ -111,6 +126,7 @@ var ClientEvents = fsm.Events{
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("could not complete data transfer, could not connect to provider %s", deal.Miner).Error()
deal.AddLog(deal.Message)
return nil
}),

Expand All @@ -123,6 +139,7 @@ var ClientEvents = fsm.Events{
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "data transfer cancelled"
deal.AddLog(deal.Message)
return nil
}),

Expand All @@ -137,70 +154,96 @@ var ClientEvents = fsm.Events{
deal.PollErrorCount++
}
deal.Message = fmt.Sprintf("Provider state: %s", storagemarket.DealStates[providerState])
switch storagemarket.DealStates[providerState] {
case "StorageDealVerifyData":
deal.AddLog("provider is verifying the data")
case "StorageDealPublish":
deal.AddLog("waiting for provider to publish the deal on-chain") // TODO: is that right?
case "StorageDealPublishing":
deal.AddLog("provider has submitted the deal on-chain and is waiting for confirmation") // TODO: is that right?
case "StorageDealProviderFunding":
deal.AddLog("waiting for provider to lock collateral on-chain") // TODO: is that right?
default:
deal.AddLog(deal.Message)
}
return nil
}),
fsm.Event(storagemarket.ClientEventResponseDealDidNotMatch).
From(storagemarket.StorageDealCheckForAcceptance).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, responseCid cid.Cid, proposalCid cid.Cid) error {
deal.Message = xerrors.Errorf("miner responded to a wrong proposal: %s != %s", responseCid, proposalCid).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventDealRejected).
From(storagemarket.StorageDealCheckForAcceptance).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, state storagemarket.StorageDealStatus, reason string) error {
deal.Message = xerrors.Errorf("deal failed: (State=%d) %s", state, reason).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventDealAccepted).
From(storagemarket.StorageDealCheckForAcceptance).To(storagemarket.StorageDealProposalAccepted).
Action(func(deal *storagemarket.ClientDeal, publishMessage *cid.Cid) error {
deal.PublishMessage = publishMessage
deal.Message = ""
deal.AddLog("deal has been accepted by storage provider")
return nil
}),
fsm.Event(storagemarket.ClientEventStreamCloseError).
FromAny().To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error attempting to close stream: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventDealPublishFailed).
From(storagemarket.StorageDealProposalAccepted).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error validating deal published: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventDealPublished).
From(storagemarket.StorageDealProposalAccepted).To(storagemarket.StorageDealAwaitingPreCommit).
Action(func(deal *storagemarket.ClientDeal, dealID abi.DealID) error {
deal.DealID = dealID
deal.AddLog("")
return nil
}),
fsm.Event(storagemarket.ClientEventDealPrecommitFailed).
From(storagemarket.StorageDealAwaitingPreCommit).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error awaiting deal pre-commit: %w", err).Error()
deal.Message = xerrors.Errorf("error waiting for deal pre-commit message to appear on chain: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventDealPrecommitted).
From(storagemarket.StorageDealAwaitingPreCommit).To(storagemarket.StorageDealSealing).
Action(func(deal *storagemarket.ClientDeal, sectorNumber abi.SectorNumber) error {
deal.SectorNumber = sectorNumber
deal.AddLog("deal pre-commit message has landed on chain")
return nil
}),
fsm.Event(storagemarket.ClientEventDealActivationFailed).
From(storagemarket.StorageDealSealing).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error in deal activation: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventDealActivated).
FromMany(storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing).
To(storagemarket.StorageDealActive),
To(storagemarket.StorageDealActive).
Action(func(deal *storagemarket.ClientDeal) error {
deal.AddLog("deal activated")
return nil
}),
fsm.Event(storagemarket.ClientEventDealSlashed).
From(storagemarket.StorageDealActive).To(storagemarket.StorageDealSlashed).
Action(func(deal *storagemarket.ClientDeal, slashEpoch abi.ChainEpoch) error {
deal.SlashEpoch = slashEpoch
deal.AddLog("deal slashed at epoch <%d>", slashEpoch)
return nil
}),
fsm.Event(storagemarket.ClientEventDealExpired).
Expand All @@ -209,10 +252,15 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealActive).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error waiting for deal completion: %w", err).Error()
deal.AddLog(deal.Message)
return nil
}),
fsm.Event(storagemarket.ClientEventFailed).
From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError),
From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal) error {
deal.AddLog("")
return nil
}),
fsm.Event(storagemarket.ClientEventRestart).From(storagemarket.StorageDealTransferring).To(storagemarket.StorageDealClientTransferRestart).
FromAny().ToNoChange(),
}
Expand Down
6 changes: 3 additions & 3 deletions storagemarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestProposeDeal(t *testing.T) {
},
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
assert.Equal(t, "error reading Response message: read response failed", deal.Message)
assert.Equal(t, "error reading Response message from provider: read response failed", deal.Message)
},
})
})
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestVerifyDealPreCommitted(t *testing.T) {
nodeParams: nodeParams{DealPreCommittedSyncError: errors.New("Something went wrong")},
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealError, deal.State)
assert.Equal(t, "error awaiting deal pre-commit: Something went wrong", deal.Message)
assert.Equal(t, "error waiting for deal pre-commit message to appear on chain: Something went wrong", deal.Message)
},
})
})
Expand All @@ -446,7 +446,7 @@ func TestVerifyDealPreCommitted(t *testing.T) {
nodeParams: nodeParams{DealPreCommittedAsyncError: errors.New("Something went wrong later")},
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealError, deal.State)
assert.Equal(t, "error awaiting deal pre-commit: Something went wrong later", deal.Message)
assert.Equal(t, "error waiting for deal pre-commit message to appear on chain: Something went wrong later", deal.Message)
},
})
})
Expand Down
Loading

0 comments on commit 39a8025

Please sign in to comment.