Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync up DealState to match spec #50

Merged
merged 2 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Client struct {
}

type clientDealUpdate struct {
newState storagemarket.DealState
newState storagemarket.StorageDealStatus
id cid.Cid
err error
mut func(*ClientDeal)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (c *Client) onIncoming(deal *ClientDeal) {

go func() {
c.updated <- clientDealUpdate{
newState: storagemarket.DealUnknown,
newState: storagemarket.StorageDealUnknown,
id: deal.ProposalCid,
err: nil,
}
Expand Down Expand Up @@ -158,15 +158,15 @@ func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
}

switch update.newState {
case storagemarket.DealUnknown: // new
c.handle(ctx, deal, c.new, storagemarket.DealAccepted)
case storagemarket.DealAccepted:
c.handle(ctx, deal, c.accepted, storagemarket.DealStaged)
case storagemarket.DealStaged:
c.handle(ctx, deal, c.staged, storagemarket.DealSealing)
case storagemarket.DealSealing:
c.handle(ctx, deal, c.sealing, storagemarket.DealNoUpdate)
// TODO: DealComplete -> watch for faults, expiration, etc.
case storagemarket.StorageDealUnknown: // new
c.handle(ctx, deal, c.new, storagemarket.StorageDealProposalAccepted)
case storagemarket.StorageDealProposalAccepted:
c.handle(ctx, deal, c.accepted, storagemarket.StorageDealStaged)
case storagemarket.StorageDealStaged:
c.handle(ctx, deal, c.staged, storagemarket.StorageDealSealing)
case storagemarket.StorageDealSealing:
c.handle(ctx, deal, c.sealing, storagemarket.StorageDealNoUpdate)
// TODO: StorageDealActive -> watch for faults, expiration, etc.
}
}

Expand Down Expand Up @@ -233,7 +233,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
ClientDeal: storagemarket.ClientDeal{
ProposalCid: proposalNd.Cid(),
Proposal: *dealProposal,
State: storagemarket.DealUnknown,
State: storagemarket.StorageDealUnknown,
Miner: p.MinerID,
MinerWorker: p.MinerWorker,
PayloadCid: p.Data,
Expand Down
10 changes: 5 additions & 5 deletions storagemarket/impl/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (

type clientHandlerFunc func(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error)

func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next storagemarket.DealState) {
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next storagemarket.StorageDealStatus) {
go func() {
mut, err := cb(ctx, deal)
if err != nil {
next = storagemarket.DealError
next = storagemarket.StorageDealError
}

if err == nil && next == storagemarket.DealNoUpdate {
if err == nil && next == storagemarket.StorageDealNoUpdate {
return
}

Expand Down Expand Up @@ -47,7 +47,7 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) (func(*ClientDeal), e
}

/* data transfer happens */
if resp.State != storagemarket.DealAccepted {
if resp.State != storagemarket.StorageDealProposalAccepted {
return nil, xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
}

Expand Down Expand Up @@ -79,7 +79,7 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal
cb := func(err error) {
select {
case c.updated <- clientDealUpdate{
newState: storagemarket.DealComplete,
newState: storagemarket.StorageDealActive,
id: deal.ProposalCid,
err: err,
}:
Expand Down
44 changes: 22 additions & 22 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Provider struct {
}

type minerDealUpdate struct {
newState storagemarket.DealState
newState storagemarket.StorageDealStatus
id cid.Cid
err error
mut func(*MinerDeal)
Expand Down Expand Up @@ -139,9 +139,9 @@ func (p *Provider) Run(ctx context.Context, host host.Host) {

for {
select {
case deal := <-p.incoming: // DealAccepted
case deal := <-p.incoming:
p.onIncoming(deal)
case update := <-p.updated: // DealStaged
case update := <-p.updated:
p.onUpdated(ctx, update)
case <-p.stop:
return
Expand All @@ -164,7 +164,7 @@ func (p *Provider) onIncoming(deal MinerDeal) {

go func() {
p.updated <- minerDealUpdate{
newState: storagemarket.DealValidating,
newState: storagemarket.StorageDealValidating,
id: deal.ProposalCid,
err: nil,
}
Expand Down Expand Up @@ -193,20 +193,20 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
}

switch update.newState {
case storagemarket.DealValidating:
p.handle(ctx, deal, p.validating, storagemarket.DealTransferring)
case storagemarket.DealTransferring:
p.handle(ctx, deal, p.transferring, storagemarket.DealNoUpdate)
case storagemarket.DealVerifyData:
p.handle(ctx, deal, p.verifydata, storagemarket.DealPublishing)
case storagemarket.DealPublishing:
p.handle(ctx, deal, p.publishing, storagemarket.DealStaged)
case storagemarket.DealStaged:
p.handle(ctx, deal, p.staged, storagemarket.DealSealing)
case storagemarket.DealSealing:
p.handle(ctx, deal, p.sealing, storagemarket.DealComplete)
case storagemarket.DealComplete:
p.handle(ctx, deal, p.complete, storagemarket.DealNoUpdate)
case storagemarket.StorageDealValidating:
p.handle(ctx, deal, p.validating, storagemarket.StorageDealTransferring)
case storagemarket.StorageDealTransferring:
p.handle(ctx, deal, p.transferring, storagemarket.StorageDealNoUpdate)
case storagemarket.StorageDealVerifyData:
p.handle(ctx, deal, p.verifydata, storagemarket.StorageDealPublishing)
case storagemarket.StorageDealPublishing:
p.handle(ctx, deal, p.publishing, storagemarket.StorageDealStaged)
case storagemarket.StorageDealStaged:
p.handle(ctx, deal, p.staged, storagemarket.StorageDealSealing)
case storagemarket.StorageDealSealing:
p.handle(ctx, deal, p.sealing, storagemarket.StorageDealActive)
case storagemarket.StorageDealActive:
p.handle(ctx, deal, p.complete, storagemarket.StorageDealNoUpdate)
}
}

Expand All @@ -223,14 +223,14 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da
}

// data transfer events for opening and progress do not affect deal state
var next storagemarket.DealState
var next storagemarket.StorageDealStatus
var err error
var mut func(*MinerDeal)
switch event.Code {
case datatransfer.Complete:
next = storagemarket.DealVerifyData
next = storagemarket.StorageDealVerifyData
case datatransfer.Error:
next = storagemarket.DealFailed
next = storagemarket.StorageDealFailing
err = ErrDataTransferFailed
default:
// the only events we care about are complete and error
Expand Down Expand Up @@ -259,7 +259,7 @@ func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error)
Client: s.Conn().RemotePeer(),
Proposal: *proposal.DealProposal,
ProposalCid: proposalNd.Cid(),
State: storagemarket.DealUnknown,
State: storagemarket.StorageDealUnknown,

Ref: proposal.Piece,
},
Expand Down
16 changes: 8 additions & 8 deletions storagemarket/impl/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (

type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)

func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandlerFunc, next storagemarket.DealState) {
func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandlerFunc, next storagemarket.StorageDealStatus) {
go func() {
mut, err := cb(ctx, deal)

if err == nil && next == storagemarket.DealNoUpdate {
if err == nil && next == storagemarket.StorageDealNoUpdate {
return
}

Expand All @@ -37,7 +37,7 @@ func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandle
}()
}

// DealValidating
// StorageDealValidating
func (p *Provider) validating(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
head, err := p.spn.MostRecentStateId(ctx)
if err != nil {
Expand Down Expand Up @@ -74,7 +74,7 @@ func (p *Provider) validating(ctx context.Context, deal MinerDeal) (func(*MinerD
return nil, nil
}

// State: DealTransferring
// State: StorageDealTransferring
func (p *Provider) transferring(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())

Expand All @@ -101,7 +101,7 @@ func (p *Provider) transferring(ctx context.Context, deal MinerDeal) (func(*Mine
return nil, nil
}

// State: DealVerifyData
// State: StorageDealVerifyData
func (p *Provider) verifydata(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
// entire DAG selector
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
Expand All @@ -123,7 +123,7 @@ func (p *Provider) verifydata(ctx context.Context, deal MinerDeal) (func(*MinerD
}, nil
}

// State: DealPublishing
// State: StorageDealPublishing
func (p *Provider) publishing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
waddr, err := p.spn.GetMinerWorker(ctx, deal.Proposal.Provider)
if err != nil {
Expand All @@ -150,7 +150,7 @@ func (p *Provider) publishing(ctx context.Context, deal MinerDeal) (func(*MinerD
}

err = p.sendSignedResponse(ctx, &Response{
State: storagemarket.DealAccepted,
State: storagemarket.StorageDealProposalAccepted,

Proposal: deal.ProposalCid,
PublishMessage: &mcid,
Expand Down Expand Up @@ -205,7 +205,7 @@ func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal
cb := func(err error) {
select {
case p.updated <- minerDealUpdate{
newState: storagemarket.DealComplete,
newState: storagemarket.StorageDealActive,
id: deal.ProposalCid,
err: err,
}:
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/provider_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) {
log.Warnf("deal %s failed: %s", id, cerr)

err := p.sendSignedResponse(ctx, &Response{
State: storagemarket.DealFailed,
State: storagemarket.StorageDealFailing,
Message: cerr.Error(),
Proposal: id,
})
Expand Down
20 changes: 10 additions & 10 deletions storagemarket/impl/request_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func uniqueStorageDealProposal() (storagemarket.StorageDealProposal, error) {
}, nil
}

func newClientDeal(minerID peer.ID, state storagemarket.DealState) (deals.ClientDeal, error) {
func newClientDeal(minerID peer.ID, state storagemarket.StorageDealStatus) (deals.ClientDeal, error) {
newProposal, err := uniqueStorageDealProposal()
if err != nil {
return deals.ClientDeal{}, err
Expand All @@ -84,7 +84,7 @@ func newClientDeal(minerID peer.ID, state storagemarket.DealState) (deals.Client
}, nil
}

func newMinerDeal(clientID peer.ID, state storagemarket.DealState) (deals.MinerDeal, error) {
func newMinerDeal(clientID peer.ID, state storagemarket.StorageDealStatus) (deals.MinerDeal, error) {
newProposal, err := uniqueStorageDealProposal()
if err != nil {
return deals.MinerDeal{}, err
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestClientRequestValidation(t *testing.T) {
})
t.Run("ValidatePull fails wrong client", func(t *testing.T) {
otherMiner := peer.ID("otherminer")
clientDeal, err := newClientDeal(otherMiner, storagemarket.DealAccepted)
clientDeal, err := newClientDeal(otherMiner, storagemarket.StorageDealProposalAccepted)
if err != nil {
t.Fatal("error creating client deal")
}
Expand All @@ -150,7 +150,7 @@ func TestClientRequestValidation(t *testing.T) {
}
})
t.Run("ValidatePull fails wrong piece ref", func(t *testing.T) {
clientDeal, err := newClientDeal(minerID, storagemarket.DealAccepted)
clientDeal, err := newClientDeal(minerID, storagemarket.StorageDealProposalAccepted)
if err != nil {
t.Fatal("error creating client deal")
}
Expand All @@ -162,7 +162,7 @@ func TestClientRequestValidation(t *testing.T) {
}
})
t.Run("ValidatePull fails wrong deal state", func(t *testing.T) {
clientDeal, err := newClientDeal(minerID, storagemarket.DealComplete)
clientDeal, err := newClientDeal(minerID, storagemarket.StorageDealActive)
if err != nil {
t.Fatal("error creating client deal")
}
Expand All @@ -175,7 +175,7 @@ func TestClientRequestValidation(t *testing.T) {
}
})
t.Run("ValidatePull succeeds", func(t *testing.T) {
clientDeal, err := newClientDeal(minerID, storagemarket.DealAccepted)
clientDeal, err := newClientDeal(minerID, storagemarket.StorageDealProposalAccepted)
if err != nil {
t.Fatal("error creating client deal")
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestProviderRequestValidation(t *testing.T) {
})
t.Run("ValidatePush fails wrong miner", func(t *testing.T) {
otherClient := peer.ID("otherclient")
minerDeal, err := newMinerDeal(otherClient, storagemarket.DealAccepted)
minerDeal, err := newMinerDeal(otherClient, storagemarket.StorageDealProposalAccepted)
if err != nil {
t.Fatal("error creating client deal")
}
Expand All @@ -234,7 +234,7 @@ func TestProviderRequestValidation(t *testing.T) {
}
})
t.Run("ValidatePush fails wrong piece ref", func(t *testing.T) {
minerDeal, err := newMinerDeal(clientID, storagemarket.DealAccepted)
minerDeal, err := newMinerDeal(clientID, storagemarket.StorageDealProposalAccepted)
if err != nil {
t.Fatal("error creating client deal")
}
Expand All @@ -246,7 +246,7 @@ func TestProviderRequestValidation(t *testing.T) {
}
})
t.Run("ValidatePush fails wrong deal state", func(t *testing.T) {
minerDeal, err := newMinerDeal(clientID, storagemarket.DealComplete)
minerDeal, err := newMinerDeal(clientID, storagemarket.StorageDealActive)
if err != nil {
t.Fatal("error creating client deal")
}
Expand All @@ -259,7 +259,7 @@ func TestProviderRequestValidation(t *testing.T) {
}
})
t.Run("ValidatePush succeeds", func(t *testing.T) {
minerDeal, err := newMinerDeal(clientID, storagemarket.DealAccepted)
minerDeal, err := newMinerDeal(clientID, storagemarket.StorageDealProposalAccepted)
if err != nil {
t.Fatal("error creating client deal")
}
Expand Down
6 changes: 3 additions & 3 deletions storagemarket/impl/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
ErrInacceptableDealState = errors.New("deal is not a in a state where deals are accepted.")

// DataTransferStates are the states in which it would make sense to actually start a data transfer
DataTransferStates = []storagemarket.DealState{storagemarket.DealAccepted, storagemarket.DealUnknown}
DataTransferStates = []storagemarket.StorageDealStatus{storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealUnknown}
)

type Proposal struct {
Expand All @@ -50,13 +50,13 @@ type Proposal struct {
}

type Response struct {
State storagemarket.DealState
State storagemarket.StorageDealStatus

// DealProposalRejected
Message string
Proposal cid.Cid

// DealAccepted
// StorageDealProposalAccepted
PublishMessage *cid.Cid
}

Expand Down
Loading