Skip to content

Commit

Permalink
fix: track payment channel by ID instead of from/to
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Aug 6, 2020
1 parent b3bdd2b commit 03a258b
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 80 deletions.
39 changes: 2 additions & 37 deletions paychmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/ipfs/go-datastore"

"golang.org/x/sync/errgroup"

xerrors "golang.org/x/xerrors"

"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -103,42 +101,9 @@ func HandleManager(lc fx.Lifecycle, pm *Manager) {
})
}

// Start checks the datastore to see if there are any channels that have
// outstanding add funds messages, and if so, waits on the messages.
// Outstanding messages can occur if an add funds message was sent
// and then lotus was shut down or crashed before the result was
// received.
// Start restarts tracking of any messages that were sent to chain.
func (pm *Manager) Start() error {
cis, err := pm.store.WithPendingAddFunds()
if err != nil {
return err
}

group := errgroup.Group{}
for _, chanInfo := range cis {
ci := chanInfo
if ci.CreateMsg != nil {
group.Go(func() error {
ca, err := pm.accessorByFromTo(ci.Control, ci.Target)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err)
}
go ca.waitForPaychCreateMsg(ci.Control, ci.Target, *ci.CreateMsg, nil)
return nil
})
} else if ci.AddFundsMsg != nil {
group.Go(func() error {
ca, err := pm.accessorByAddress(*ci.Channel)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err)
}
go ca.waitForAddFundsMsg(ci.Control, ci.Target, *ci.AddFundsMsg, nil)
return nil
})
}
}

return group.Wait()
return pm.restartPending()
}

// Stop shuts down any processes used by the manager
Expand Down
2 changes: 1 addition & 1 deletion paychmgr/paychget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {

// 2. Should block until create channel has completed.
// Because first channel create fails, this request
// should be for channel create.
// should be for channel create again.
amt2 := big.NewInt(5)
ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions paychmgr/settle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ func TestPaychSettle(t *testing.T) {
ch2, err := mgr.GetPaychWaitReady(ctx, mcid2)
require.NoError(t, err)
require.NotEqual(t, ch, ch2)

// There should now be two channels
cis, err := mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 2)
}
99 changes: 67 additions & 32 deletions paychmgr/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"fmt"

"golang.org/x/sync/errgroup"

"github.com/filecoin-project/lotus/api"

"github.com/filecoin-project/specs-actors/actors/abi/big"
Expand Down Expand Up @@ -191,9 +193,7 @@ func (ca *channelAccessor) processTask(
}

// If a channel has not yet been created, create one.
// Note that if the previous attempt to create the channel failed because of a VM error
// (eg not enough gas), both channelInfo.Channel and channelInfo.CreateMsg will be nil.
if channelInfo == nil || channelInfo.Channel == nil && channelInfo.CreateMsg == nil {
if channelInfo == nil {
mcid, err := ca.createPaych(ctx, from, to, amt, onComplete)
if err != nil {
return &paychFundsRes{err: err}
Expand All @@ -218,7 +218,7 @@ func (ca *channelAccessor) processTask(

// We need to add more funds, so send an add funds message to
// cover the amount for this request
mcid, err := ca.addFunds(ctx, from, to, amt, onComplete)
mcid, err := ca.addFunds(ctx, channelInfo, amt, onComplete)
if err != nil {
return &paychFundsRes{err: err}
}
Expand Down Expand Up @@ -257,43 +257,45 @@ func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Add
mcid := smsg.Cid()

// Create a new channel in the store
if _, err := ca.store.createChannel(from, to, mcid, amt); err != nil {
ci, err := ca.store.CreateChannel(from, to, mcid, amt)
if err != nil {
log.Errorf("creating channel: %s", err)
return cid.Undef, err
}

// Wait for the channel to be created on chain
go ca.waitForPaychCreateMsg(from, to, mcid, cb)
go ca.waitForPaychCreateMsg(ci.ChannelID, mcid, cb)

return mcid, nil
}

// waitForPaychCreateMsg waits for mcid to appear on chain and stores the robust address of the
// created payment channel
func (ca *channelAccessor) waitForPaychCreateMsg(from address.Address, to address.Address, mcid cid.Cid, cb onCompleteFn) {
err := ca.waitPaychCreateMsg(from, to, mcid)
func (ca *channelAccessor) waitForPaychCreateMsg(channelID string, mcid cid.Cid, cb onCompleteFn) {
err := ca.waitPaychCreateMsg(channelID, mcid)
ca.msgWaitComplete(mcid, err, cb)
}

func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.Address, mcid cid.Cid) error {
func (ca *channelAccessor) waitPaychCreateMsg(channelID string, mcid cid.Cid) error {
mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence)
if err != nil {
log.Errorf("wait msg: %w", err)
return err
}

// If channel creation failed
if mwait.Receipt.ExitCode != 0 {
err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
log.Error(err)

ca.lk.Lock()
defer ca.lk.Unlock()

ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.CreateMsg = nil
})
// Channel creation failed, so remove the channel from the datastore
dserr := ca.store.RemoveChannel(channelID)
if dserr != nil {
log.Errorf("failed to remove channel %s: %s", channelID, dserr)
}

err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
log.Error(err)
return err
}

Expand All @@ -308,7 +310,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.A
defer ca.lk.Unlock()

// Store robust address of channel
ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) {
channelInfo.Channel = &decodedReturn.RobustAddress
channelInfo.Amount = channelInfo.PendingAmount
channelInfo.PendingAmount = big.NewInt(0)
Expand All @@ -319,12 +321,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.A
}

// addFunds sends a message to add funds to the channel and returns the message cid
func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, to address.Address, amt types.BigInt, cb onCompleteFn) (*cid.Cid, error) {
channelInfo, err := ca.store.OutboundActiveByFromTo(from, to)
if err != nil {
return nil, err
}

func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt types.BigInt, cb onCompleteFn) (*cid.Cid, error) {
msg := &types.Message{
To: *channelInfo.Channel,
From: channelInfo.Control,
Expand All @@ -341,7 +338,7 @@ func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, t
mcid := smsg.Cid()

// Store the add funds message CID on the channel
ca.mutateChannelInfo(from, to, func(ci *ChannelInfo) {
ca.mutateChannelInfo(channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.PendingAmount = amt
ci.AddFundsMsg = &mcid
})
Expand All @@ -353,18 +350,18 @@ func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, t
log.Errorf("saving add funds message CID %s: %s", mcid, err)
}

go ca.waitForAddFundsMsg(from, to, mcid, cb)
go ca.waitForAddFundsMsg(channelInfo.ChannelID, mcid, cb)

return &mcid, nil
}

// waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
func (ca *channelAccessor) waitForAddFundsMsg(from address.Address, to address.Address, mcid cid.Cid, cb onCompleteFn) {
err := ca.waitAddFundsMsg(from, to, mcid)
func (ca *channelAccessor) waitForAddFundsMsg(channelID string, mcid cid.Cid, cb onCompleteFn) {
err := ca.waitAddFundsMsg(channelID, mcid)
ca.msgWaitComplete(mcid, err, cb)
}

func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Address, mcid cid.Cid) error {
func (ca *channelAccessor) waitAddFundsMsg(channelID string, mcid cid.Cid) error {
mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence)
if err != nil {
log.Error(err)
Expand All @@ -378,7 +375,7 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr
ca.lk.Lock()
defer ca.lk.Unlock()

ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) {
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
})
Expand All @@ -390,7 +387,7 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr
defer ca.lk.Unlock()

// Store updated amount
ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) {
channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount)
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
Expand All @@ -400,8 +397,8 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr
}

// Change the state of the channel in the store
func (ca *channelAccessor) mutateChannelInfo(from address.Address, to address.Address, mutate func(*ChannelInfo)) {
channelInfo, err := ca.store.OutboundActiveByFromTo(from, to)
func (ca *channelAccessor) mutateChannelInfo(channelID string, mutate func(*ChannelInfo)) {
channelInfo, err := ca.store.ByChannelID(channelID)

// If there's an error reading or writing to the store just log an error.
// For now we're assuming it's unlikely to happen in practice.
Expand All @@ -420,6 +417,44 @@ func (ca *channelAccessor) mutateChannelInfo(from address.Address, to address.Ad
}
}

// restartPending checks the datastore to see if there are any channels that
// have outstanding create / add funds messages, and if so, waits on the
// messages.
// Outstanding messages can occur if a create / add funds message was sent and
// then the system was shut down or crashed before the result was received.
func (pm *Manager) restartPending() error {
cis, err := pm.store.WithPendingAddFunds()
if err != nil {
return err
}

group := errgroup.Group{}
for _, chanInfo := range cis {
ci := chanInfo
if ci.CreateMsg != nil {
group.Go(func() error {
ca, err := pm.accessorByFromTo(ci.Control, ci.Target)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err)
}
go ca.waitForPaychCreateMsg(ci.ChannelID, *ci.CreateMsg, nil)
return nil
})
} else if ci.AddFundsMsg != nil {
group.Go(func() error {
ca, err := pm.accessorByAddress(*ci.Channel)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err)
}
go ca.waitForAddFundsMsg(ci.ChannelID, *ci.AddFundsMsg, nil)
return nil
})
}
}

return group.Wait()
}

// getPaychWaitReady waits for a the response to the message with the given cid
func (ca *channelAccessor) getPaychWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
ca.lk.Lock()
Expand Down
41 changes: 31 additions & 10 deletions paychmgr/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (ps *Store) findChans(filter func(*ChannelInfo) bool, max int) ([]ChannelIn
return nil, err
}

ci, err := unmarshallChannelInfo(&stored, res)
ci, err := unmarshallChannelInfo(&stored, res.Value)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -313,11 +313,25 @@ func (ps *Store) WithPendingAddFunds() ([]ChannelInfo, error) {
}, 0)
}

// createChannel creates an outbound channel for the given from / to, ensuring
// ByChannelID gets channel info by channel ID
func (ps *Store) ByChannelID(channelID string) (*ChannelInfo, error) {
var stored ChannelInfo

res, err := ps.ds.Get(dskeyForChannel(channelID))
if err != nil {
if err == datastore.ErrNotFound {
return nil, ErrChannelNotTracked
}
return nil, err
}

return unmarshallChannelInfo(&stored, res)
}

// CreateChannel creates an outbound channel for the given from / to, ensuring
// it has a higher sequence number than any existing channel with the same from / to
func (ps *Store) createChannel(from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) {
func (ps *Store) CreateChannel(from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) {
ci := &ChannelInfo{
ChannelID: uuid.New().String(),
Direction: DirOutbound,
NextLane: 0,
Control: from,
Expand All @@ -341,15 +355,22 @@ func (ps *Store) createChannel(from address.Address, to address.Address, createM
return ci, err
}

// RemoveChannel removes the channel with the given channel ID
func (ps *Store) RemoveChannel(channelID string) error {
return ps.ds.Delete(dskeyForChannel(channelID))
}

// The datastore key used to identify the channel info
func dskeyForChannel(ci *ChannelInfo) datastore.Key {
chanKey := fmt.Sprintf("%s->%s", ci.Control.String(), ci.Target.String())
return datastore.KeyWithNamespaces([]string{dsKeyChannelInfo, chanKey})
func dskeyForChannel(channelID string) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyChannelInfo, channelID})
}

// putChannelInfo stores the channel info in the datastore
func (ps *Store) putChannelInfo(ci *ChannelInfo) error {
k := dskeyForChannel(ci)
if len(ci.ChannelID) == 0 {
ci.ChannelID = uuid.New().String()
}
k := dskeyForChannel(ci.ChannelID)

b, err := marshallChannelInfo(ci)
if err != nil {
Expand Down Expand Up @@ -380,8 +401,8 @@ func marshallChannelInfo(ci *ChannelInfo) ([]byte, error) {
return cborrpc.Dump(ci)
}

func unmarshallChannelInfo(stored *ChannelInfo, res dsq.Result) (*ChannelInfo, error) {
if err := stored.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil {
func unmarshallChannelInfo(stored *ChannelInfo, value []byte) (*ChannelInfo, error) {
if err := stored.UnmarshalCBOR(bytes.NewReader(value)); err != nil {
return nil, err
}

Expand Down

0 comments on commit 03a258b

Please sign in to comment.