Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes locking issues when creating channel / adding funds #2640

Merged
merged 10 commits into from
Aug 6, 2020
3 changes: 2 additions & 1 deletion api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ type FullNode interface {
// MethodGroup: Paych
// The Paych methods are for interacting with and managing payment channels

PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error)
PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error)
PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychSettle(context.Context, address.Address) (cid.Cid, error)
Expand Down
11 changes: 8 additions & 3 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ type FullNodeStruct struct {

MarketEnsureAvailable func(context.Context, address.Address, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"`

PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
PaychStatus func(context.Context, address.Address) (*api.PaychStatus, error) `perm:"read"`
PaychSettle func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
Expand Down Expand Up @@ -803,8 +804,12 @@ func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr, wallet
return c.Internal.MarketEnsureAvailable(ctx, addr, wallet, amt)
}

func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, ensureFunds)
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, amt)
}

func (c *FullNodeStruct) PaychGetWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
return c.Internal.PaychGetWaitReady(ctx, mcid)
}

func (c *FullNodeStruct) PaychList(ctx context.Context) ([]address.Address, error) {
Expand Down
74 changes: 41 additions & 33 deletions api/test/paych.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package test

import (
"bytes"
"context"
"fmt"
"github.com/filecoin-project/specs-actors/actors/builtin"
"os"
"sync/atomic"
"testing"
"time"

"github.com/filecoin-project/specs-actors/actors/builtin"

"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
initactor "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/ipfs/go-cid"

Expand All @@ -28,8 +27,6 @@ import (
)

func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Skip("fixme")

_ = os.Setenv("BELLMAN_NO_GPU", "1")

ctx := context.Background()
Expand Down Expand Up @@ -77,13 +74,10 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal(err)
}

res := waitForMessage(ctx, t, paymentCreator, channelInfo.ChannelMessage, time.Second, "channel create")
var params initactor.ExecReturn
err = params.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
channel, err := paymentCreator.PaychGetWaitReady(ctx, channelInfo.ChannelMessage)
if err != nil {
t.Fatal(err)
}
channel := params.RobustAddress

// allocate three lanes
var lanes []uint64
Expand Down Expand Up @@ -129,16 +123,11 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal(err)
}

res = waitForMessage(ctx, t, paymentCreator, settleMsgCid, time.Second*10, "settle")
res := waitForMessage(ctx, t, paymentCreator, settleMsgCid, time.Second*10, "settle")
if res.Receipt.ExitCode != 0 {
t.Fatal("Unable to settle payment channel")
}

creatorPreCollectBalance, err := paymentCreator.WalletBalance(ctx, createrAddr)
if err != nil {
t.Fatal(err)
}

// wait for the receiver to submit their vouchers
ev := events.NewEvents(ctx, paymentCreator)
preds := state.NewStatePredicates(paymentCreator)
Expand Down Expand Up @@ -172,32 +161,20 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal("Timed out waiting for receiver to submit vouchers")
}

atomic.StoreInt64(&bm.nulls, paych.SettleDelay)
// wait for the settlement period to pass before collecting
waitForBlocks(ctx, t, bm, paymentReceiver, receiverAddr, paych.SettleDelay)

{
// wait for a block
m, err := paymentCreator.MpoolPushMessage(ctx, &types.Message{
To: builtin.BurntFundsActorAddr,
From: createrAddr,
Value: types.NewInt(0),
GasPrice: big.Zero(),
})
if err != nil {
t.Fatal(err)
}

_, err = paymentCreator.StateWaitMsg(ctx, m.Cid(), 3)
if err != nil {
t.Fatal(err)
}
creatorPreCollectBalance, err := paymentCreator.WalletBalance(ctx, createrAddr)
if err != nil {
t.Fatal(err)
}

// collect funds (from receiver, though either party can do it)
collectMsg, err := paymentReceiver.PaychCollect(ctx, channel)
if err != nil {
t.Fatal(err)
}
res, err = paymentReceiver.StateWaitMsg(ctx, collectMsg, 1)
res, err = paymentReceiver.StateWaitMsg(ctx, collectMsg, 3)
if err != nil {
t.Fatal(err)
}
Expand All @@ -213,6 +190,7 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {

// The highest nonce voucher that the creator sent on each lane is 2000
totalVouchers := int64(len(lanes) * 2000)

// When receiver submits the tokens to the chain, creator should get a
// refund on the remaining balance, which is
// channel amount - total voucher value
Expand All @@ -226,6 +204,36 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
bm.stop()
}

func waitForBlocks(ctx context.Context, t *testing.T, bm *blockMiner, paymentReceiver TestNode, receiverAddr address.Address, count int) {
// We need to add null blocks in batches, if we add too many the chain can't sync
batchSize := 60
for i := 0; i < count; i += batchSize {
size := batchSize
if i > count {
size = count - i
}

// Add a batch of null blocks
atomic.StoreInt64(&bm.nulls, int64(size-1))

// Add a real block
m, err := paymentReceiver.MpoolPushMessage(ctx, &types.Message{
To: builtin.BurntFundsActorAddr,
From: receiverAddr,
Value: types.NewInt(0),
GasPrice: big.Zero(),
})
if err != nil {
t.Fatal(err)
}

_, err = paymentReceiver.StateWaitMsg(ctx, m.Cid(), 1)
if err != nil {
t.Fatal(err)
}
}
}

func waitForMessage(ctx context.Context, t *testing.T, paymentCreator TestNode, msgCid cid.Cid, duration time.Duration, desc string) *api.MsgLookup {
ctx, cancel := context.WithTimeout(ctx, duration)
defer cancel()
Expand Down
12 changes: 9 additions & 3 deletions chain/vm/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,11 @@ func (rt *Runtime) CreateActor(codeID cid.Cid, address address.Address) {
_ = rt.chargeGasSafe(gasOnActorExec)
}

func (rt *Runtime) DeleteActor(addr address.Address) {
// DeleteActor deletes the executing actor from the state tree, transferring
// any balance to beneficiary.
// Aborts if the beneficiary does not exist.
// May only be called by the actor itself.
func (rt *Runtime) DeleteActor(beneficiary address.Address) {
rt.chargeGas(rt.Pricelist().OnDeleteActor())
act, err := rt.state.GetActor(rt.Message().Receiver())
if err != nil {
Expand All @@ -304,11 +308,13 @@ func (rt *Runtime) DeleteActor(addr address.Address) {
panic(aerrors.Fatalf("failed to get actor: %s", err))
}
if !act.Balance.IsZero() {
if err := rt.vm.transfer(rt.Message().Receiver(), builtin.BurntFundsActorAddr, act.Balance); err != nil {
panic(aerrors.Fatalf("failed to transfer balance to burnt funds actor: %s", err))
// Transfer the executing actor's balance to the beneficiary
if err := rt.vm.transfer(rt.Message().Receiver(), beneficiary, act.Balance); err != nil {
panic(aerrors.Fatalf("failed to transfer balance to beneficiary actor: %s", err))
}
}

// Delete the executing actor
if err := rt.state.DeleteActor(rt.Message().Receiver()); err != nil {
panic(aerrors.Fatalf("failed to delete actor: %s", err))
}
Expand Down
2 changes: 1 addition & 1 deletion cli/paych.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var paychCmd = &cli.Command{

var paychGetCmd = &cli.Command{
Name: "get",
Usage: "Create a new payment channel or get existing one",
Usage: "Create a new payment channel or get existing one and add amount to it",
ArgsUsage: "[fromAddress toAddress amount]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 3 {
Expand Down
1 change: 1 addition & 0 deletions gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func main() {
err = gen.WriteMapEncodersToFile("./paychmgr/cbor_gen.go", "paychmgr",
paychmgr.VoucherInfo{},
paychmgr.ChannelInfo{},
paychmgr.MsgInfo{},
)
if err != nil {
fmt.Println(err)
Expand Down
30 changes: 3 additions & 27 deletions markets/retrievaladapter/client.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package retrievaladapter

import (
"bytes"
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/specs-actors/actors/abi"
initactor "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
Expand Down Expand Up @@ -76,31 +71,12 @@ func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSet
// WaitForPaymentChannelAddFunds waits messageCID to appear on chain. If it doesn't appear within
// defaultMsgWaitTimeout it returns error
func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error {
_, mr, err := rcn.chainAPI.StateManager.WaitForMessage(context.TODO(), messageCID, build.MessageConfidence)

if err != nil {
return err
}
if mr.ExitCode != exitcode.Ok {
return xerrors.Errorf("wait for payment channel to add funds failed. exit code: %d", mr.ExitCode)
}
return nil
_, err := rcn.payAPI.PaychMgr.GetPaychWaitReady(context.TODO(), messageCID)
return err
}

func (rcn *retrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) {
_, mr, err := rcn.chainAPI.StateManager.WaitForMessage(context.TODO(), messageCID, build.MessageConfidence)

if err != nil {
return address.Undef, err
}
if mr.ExitCode != exitcode.Ok {
return address.Undef, xerrors.Errorf("payment channel creation failed. exit code: %d", mr.ExitCode)
}
var retval initactor.ExecReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(mr.Return)); err != nil {
return address.Undef, err
}
return retval.RobustAddress, nil
return rcn.payAPI.PaychMgr.GetPaychWaitReady(context.TODO(), messageCID)
}

func (rcn *retrievalClientNode) GetKnownAddresses(ctx context.Context, p retrievalmarket.RetrievalPeer, encodedTs shared.TipSetToken) ([]multiaddr.Multiaddr, error) {
Expand Down
4 changes: 2 additions & 2 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"time"

"github.com/filecoin-project/lotus/markets/dealfilter"

logging "github.com/ipfs/go-log"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -111,6 +109,7 @@ const (
HandleIncomingMessagesKey

RegisterClientValidatorKey
HandlePaymentChannelManagerKey

// miner
GetParamsKey
Expand Down Expand Up @@ -279,6 +278,7 @@ func Online() Option {
Override(new(*paychmgr.Store), paychmgr.NewStore),
Override(new(*paychmgr.Manager), paychmgr.NewManager),
Override(new(*market.FundMgr), market.StartFundManager),
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On startup we need to restart tracking of payment channel add funds requests that have already been sent to chain

Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
),

Expand Down
29 changes: 8 additions & 21 deletions node/impl/paych/paych.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type PaychAPI struct {
PaychMgr *paychmgr.Manager
}

func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, ensureFunds)
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt)
if err != nil {
return nil, err
}
Expand All @@ -40,6 +40,10 @@ func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, ensur
}, nil
}

func (a *PaychAPI) PaychGetWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return a.PaychMgr.GetPaychWaitReady(ctx, mcid)
}

func (a *PaychAPI) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) {
return a.PaychMgr.AllocateLane(ch)
}
Expand All @@ -66,7 +70,7 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
ChannelAddr: ch.Channel,

Amount: v.Amount,
Lane: uint64(lane),
Lane: lane,

Extra: v.Extra,
TimeLockMin: v.TimeLockMin,
Expand Down Expand Up @@ -108,24 +112,7 @@ func (a *PaychAPI) PaychStatus(ctx context.Context, pch address.Address) (*api.P
}

func (a *PaychAPI) PaychSettle(ctx context.Context, addr address.Address) (cid.Cid, error) {

ci, err := a.PaychMgr.GetChannelInfo(addr)
if err != nil {
return cid.Undef, err
}

msg := &types.Message{
To: addr,
From: ci.Control,
Value: types.NewInt(0),
Method: builtin.MethodsPaych.Settle,
}
smgs, err := a.MpoolPushMessage(ctx, msg)

if err != nil {
return cid.Undef, err
}
return smgs.Cid(), nil
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return a.PaychMgr.Settle(ctx, addr)
}

func (a *PaychAPI) PaychCollect(ctx context.Context, addr address.Address) (cid.Cid, error) {
Expand Down
Loading