Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Mar 4, 2021
1 parent d5b1990 commit 73a75b6
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 6 deletions.
1 change: 1 addition & 0 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
BytesPaidFor: 0,
PaymentRequested: abi.NewTokenAmount(0),
FundsSpent: abi.NewTokenAmount(0),
DealStages: &retrievalmarket.DealStages{},
Status: retrievalmarket.DealStatusNew,
Sender: p.ID,
UnsealFundsPaid: big.Zero(),
Expand Down
1 change: 1 addition & 0 deletions retrievalmarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var ClientEvents = fsm.Events{
FromAny().To(rm.DealStatusErrored).
Action(func(deal *rm.ClientDealState, err error) error {
deal.Message = xerrors.Errorf("proposing deal: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(rm.ClientEventDealProposed).
Expand Down
56 changes: 56 additions & 0 deletions retrievalmarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"errors"
"fmt"
"time"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -23,6 +25,12 @@ import (
"github.com/filecoin-project/go-fil-markets/piecestore"
)

var log = logging.Logger("retrievalmrkt")

//go:generate cbor-gen-for DealStage

//go:generate cbor-gen-for DealStages

//go:generate cbor-gen-for --map-encoding Query QueryResponse DealProposal DealResponse Params QueryParams DealPayment ClientDealState ProviderDealState PaymentInfo RetrievalPeer Ask

// QueryProtocolID is the protocol for querying information about retrieval
Expand All @@ -42,6 +50,53 @@ type PaymentInfo struct {
Lane uint64
}

func NewDealStages() *DealStages {
return &DealStages{}
}

type DealStages struct {
Stages []*DealStage
}

func (ds *DealStages) GetStage(stage string) *DealStage {
for _, s := range ds.Stages {
if s.Name == stage {
return s
}
}

return nil
}

func (ds *DealStages) AddStageLog(stage, msg string) {
log.Debugf("adding log for stage <%s> msg <%s>", stage, msg)

st := ds.GetStage(stage)
if st == nil {
st = &DealStage{
CreatedAt: time.Now(),
}
ds.Stages = append(ds.Stages, st)
}

st.UpdatedAt = time.Now()
st.Logs = append(st.Logs, msg)
}

type DealStage struct {
Name string
CreatedAt time.Time
UpdatedAt time.Time
Logs []string
}

func (d *ClientDealState) AddLog() {
msg := d.Message
stage := DealStatuses[d.Status]

d.DealStages.AddStageLog(stage, msg)
}

// ClientDealState is the current state of a deal from the point of view
// of a retrieval client
type ClientDealState struct {
Expand All @@ -59,6 +114,7 @@ type ClientDealState struct {
Sender peer.ID
TotalReceived uint64
Message string
DealStages *DealStages
BytesPaidFor uint64
CurrentInterval uint64
PaymentRequested abi.TokenAmount
Expand Down
38 changes: 37 additions & 1 deletion retrievalmarket/types_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (c *Client) ProposeStorageDeal(ctx context.Context, params storagemarket.Pr
DataRef: params.Data,
FastRetrieval: params.FastRetrieval,
StoreID: params.StoreID,
DealStages: storagemarket.NewDealStages(),
CreationTime: curTime(),
}

Expand Down
39 changes: 36 additions & 3 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealReserveClientFunds).To(storagemarket.StorageDealClientFunding).
Action(func(deal *storagemarket.ClientDeal, mcid cid.Cid) error {
deal.AddFundsCid = &mcid
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventReserveFundsFailed).
FromMany(storagemarket.StorageDealClientFunding, storagemarket.StorageDealReserveClientFunds).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("adding market funds failed: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventFundsReserved).
Expand All @@ -38,12 +40,14 @@ var ClientEvents = fsm.Events{
} else {
deal.FundsReserved = big.Add(deal.FundsReserved, fundsReserved)
}
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventFundsReleased).
FromMany(storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealFailing).ToJustRecord().
Action(func(deal *storagemarket.ClientDeal, fundsReleased abi.TokenAmount) error {
deal.FundsReserved = big.Subtract(deal.FundsReserved, fundsReleased)
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventFundingComplete).
Expand All @@ -52,48 +56,58 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("sending proposal to storage provider failed: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventReadResponseFailed).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventResponseVerificationFailed).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "unable to verify signature on deal response"
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventInitiateDataTransfer).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealStartDataTransfer),

From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealStartDataTransfer).
Action(func(deal *storagemarket.ClientDeal) error {
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventUnexpectedDealState).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, status storagemarket.StorageDealStatus, providerMessage string) error {
deal.Message = xerrors.Errorf("unexpected deal status while waiting for data request: %d (%s). Provider message: %s", status, storagemarket.DealStates[status], providerMessage).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDataTransferFailed).
FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error()
deal.AddLog()
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferRestartFailed).From(storagemarket.StorageDealClientTransferRestart).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to restart data transfer: %w", err).Error()
deal.AddLog()
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferInitiated).
FromMany(storagemarket.StorageDealStartDataTransfer).To(storagemarket.StorageDealTransferring).
Action(func(deal *storagemarket.ClientDeal, channelId datatransfer.ChannelID) error {
deal.TransferChannelID = &channelId
deal.AddLog()
return nil
}),

Expand All @@ -103,6 +117,7 @@ var ClientEvents = fsm.Events{
Action(func(deal *storagemarket.ClientDeal, channelId datatransfer.ChannelID) error {
deal.TransferChannelID = &channelId
deal.Message = ""
deal.AddLog()
return nil
}),

Expand All @@ -111,6 +126,7 @@ var ClientEvents = fsm.Events{
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("could not complete data transfer, could not connect to provider %s", deal.Miner).Error()
deal.AddLog()
return nil
}),

Expand All @@ -123,6 +139,7 @@ var ClientEvents = fsm.Events{
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "data transfer cancelled"
deal.AddLog()
return nil
}),

Expand All @@ -137,61 +154,71 @@ var ClientEvents = fsm.Events{
deal.PollErrorCount++
}
deal.Message = fmt.Sprintf("Provider state: %s", storagemarket.DealStates[providerState])
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventResponseDealDidNotMatch).
From(storagemarket.StorageDealCheckForAcceptance).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, responseCid cid.Cid, proposalCid cid.Cid) error {
deal.Message = xerrors.Errorf("miner responded to a wrong proposal: %s != %s", responseCid, proposalCid).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealRejected).
From(storagemarket.StorageDealCheckForAcceptance).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, state storagemarket.StorageDealStatus, reason string) error {
deal.Message = xerrors.Errorf("deal failed: (State=%d) %s", state, reason).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealAccepted).
From(storagemarket.StorageDealCheckForAcceptance).To(storagemarket.StorageDealProposalAccepted).
Action(func(deal *storagemarket.ClientDeal, publishMessage *cid.Cid) error {
deal.PublishMessage = publishMessage
deal.Message = ""
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventStreamCloseError).
FromAny().To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error attempting to close stream: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealPublishFailed).
From(storagemarket.StorageDealProposalAccepted).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error validating deal published: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealPublished).
From(storagemarket.StorageDealProposalAccepted).To(storagemarket.StorageDealAwaitingPreCommit).
Action(func(deal *storagemarket.ClientDeal, dealID abi.DealID) error {
deal.DealID = dealID
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealPrecommitFailed).
From(storagemarket.StorageDealAwaitingPreCommit).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error awaiting deal pre-commit: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealPrecommitted).
From(storagemarket.StorageDealAwaitingPreCommit).To(storagemarket.StorageDealSealing).
Action(func(deal *storagemarket.ClientDeal, sectorNumber abi.SectorNumber) error {
deal.SectorNumber = sectorNumber
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealActivationFailed).
From(storagemarket.StorageDealSealing).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error in deal activation: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealActivated).
Expand All @@ -201,6 +228,7 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealActive).To(storagemarket.StorageDealSlashed).
Action(func(deal *storagemarket.ClientDeal, slashEpoch abi.ChainEpoch) error {
deal.SlashEpoch = slashEpoch
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventDealExpired).
Expand All @@ -209,10 +237,15 @@ var ClientEvents = fsm.Events{
From(storagemarket.StorageDealActive).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error waiting for deal completion: %w", err).Error()
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventFailed).
From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError),
From(storagemarket.StorageDealFailing).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.AddLog()
return nil
}),
fsm.Event(storagemarket.ClientEventRestart).From(storagemarket.StorageDealTransferring).To(storagemarket.StorageDealClientTransferRestart).
FromAny().ToNoChange(),
}
Expand Down
Loading

0 comments on commit 73a75b6

Please sign in to comment.