Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: high priority processing of economic activity #7483

Merged
merged 5 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion golang/cosmos/ante/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ type FeegrantKeeper interface {
}

type SwingsetKeeper interface {
ActionQueueLength(ctx sdk.Context) (int32, error)
InboundQueueLength(ctx sdk.Context) (int32, error)
GetState(ctx sdk.Context) swingtypes.State
}
36 changes: 26 additions & 10 deletions golang/cosmos/ante/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@ import (
)

/*
This AnteDecorator enforces a limit on the size of the Swingset
inbound queue by scanning for Cosmos-messages which add Swingset-messages
to that queue. Note that when running DeliverTx, inbound messages
are staged in the actionQueue, then transferred to the inbound queue
in end-block processing. Previous Txs in the block will have already
been added to the actionQueue, so we must reject Txs which would
grow the actionQueue beyond the allowed inbound size.
This AnteDecorator enforces a limit on the size of the Swingset inbound queue
by scanning for Cosmos-messages which end up on Swingset's message queues. Note
that when running DeliverTx, inbound messages are placed in either the
actionQueue or the highPriorityQueue (forming the Swingset inbound queue),
and kept there until processed by SwingSet. Previous Txs in the block will have
already been added to the inbound queue, so we must reject Txs which would grow
the actionQueue beyond the allowed inbound size. Additions to the
highPriorityQueue are exempt of inbound queue size checks but count towards the
overall inbound queue size, which means the inbound queue can "overflow" its
limits when adding high priority actions.

We would like to reject messages during mempool admission (CheckTx)
rather than during block execution (DeliverTx), but at CheckTx time
we don't know how many messages will be allowed at DeliverTx time,
nor the size of the actionQueue from preceding Txs in the block.
To mitigate this, Swingset should implement hysteresis by computing
nor the size of the inbound queue from preceding Txs in the block.
To mitigate this, x/swingset implements an hysteresis by computing
the number of messages allowed for mempool admission as if its max
queue length was lower (e.g. 50%). This is the QueueInboundMempool
entry in the Swingset state QueueAllowed field. At DeliverTx time
Expand Down Expand Up @@ -66,8 +69,14 @@ func (ia inboundAnte) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next
return ctx, err
}
}
isHighPriority, err := ia.isPriorityMessage(ctx, msg)
if err != nil {
return ctx, err
}
if inboundsAllowed >= inbounds {
inboundsAllowed -= inbounds
} else if isHighPriority {
inboundsAllowed = 0
} else {
defer func() {
telemetry.IncrCounterWithLabels(
Expand All @@ -84,6 +93,13 @@ func (ia inboundAnte) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next
return next(ctx, tx, simulate)
}

func (ia inboundAnte) isPriorityMessage(ctx sdk.Context, msg sdk.Msg) (bool, error) {
if c, ok := msg.(vm.ControllerAdmissionMsg); ok {
return c.IsHighPriority(ctx, ia.sk)
}
return false, nil
}

// allowedInbound returns the allowed number of inbound queue messages or an error.
// Look up the limit from the swingset state queue sizes: from QueueInboundMempool
// if we're running CheckTx (for the hysteresis described above), otherwise QueueAllowed.
Expand All @@ -98,7 +114,7 @@ func (ia inboundAnte) allowedInbound(ctx sdk.Context) (int32, error) {
// if number of allowed entries not given, fail closed
return 0, nil
}
actions, err := ia.sk.ActionQueueLength(ctx)
actions, err := ia.sk.InboundQueueLength(ctx)
if err != nil {
return 0, err
}
Expand Down
162 changes: 101 additions & 61 deletions golang/cosmos/ante/inbound_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ante

import (
"context"
"fmt"
"reflect"
"testing"
Expand All @@ -15,15 +16,16 @@ import (

func TestInboundAnteHandle(t *testing.T) {
for _, tt := range []struct {
name string
checkTx bool
tx sdk.Tx
simulate bool
actionQueueLength int32
actionQueueLengthErr error
inboundLimit int32
mempoolLimit int32
errMsg string
name string
checkTx bool
tx sdk.Tx
simulate bool
inboundQueueLength int32
inboundQueueLengthErr error
inboundLimit int32
mempoolLimit int32
errMsg string
isHighPriorityOwner bool
}{
{
name: "empty-empty",
Expand All @@ -35,50 +37,50 @@ func TestInboundAnteHandle(t *testing.T) {
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "has-room",
tx: makeTestTx(&swingtypes.MsgInstallBundle{}),
inboundLimit: 10,
actionQueueLength: 8,
name: "has-room",
tx: makeTestTx(&swingtypes.MsgInstallBundle{}),
inboundLimit: 10,
inboundQueueLength: 8,
},
{
name: "at-limit",
tx: makeTestTx(&swingtypes.MsgInstallBundle{}),
inboundLimit: 10,
actionQueueLength: 9,
name: "at-limit",
tx: makeTestTx(&swingtypes.MsgInstallBundle{}),
inboundLimit: 10,
inboundQueueLength: 9,
},
{
name: "no-room",
tx: makeTestTx(&swingtypes.MsgProvision{}),
inboundLimit: 10,
actionQueueLength: 10,
errMsg: ErrInboundQueueFull.Error(),
name: "no-room",
tx: makeTestTx(&swingtypes.MsgProvision{}),
inboundLimit: 10,
inboundQueueLength: 10,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "state-lookup-error",
tx: makeTestTx(&swingtypes.MsgWalletAction{}, &swingtypes.MsgWalletSpendAction{}),
inboundLimit: 10,
actionQueueLengthErr: fmt.Errorf("sunspots"),
errMsg: "sunspots",
name: "state-lookup-error",
tx: makeTestTx(&swingtypes.MsgWalletAction{}, &swingtypes.MsgWalletSpendAction{}),
inboundLimit: 10,
inboundQueueLengthErr: fmt.Errorf("sunspots"),
errMsg: "sunspots",
},
{
name: "allow-non-swingset-msgs",
tx: makeTestTx(&banktypes.MsgSend{}, &banktypes.MsgSend{}),
inboundLimit: 1,
},
{
name: "lazy-queue-length-lookup",
tx: makeTestTx(&banktypes.MsgSend{}, &banktypes.MsgSend{}),
inboundLimit: 1,
actionQueueLengthErr: fmt.Errorf("sunspots"),
name: "lazy-queue-length-lookup",
tx: makeTestTx(&banktypes.MsgSend{}, &banktypes.MsgSend{}),
inboundLimit: 1,
inboundQueueLengthErr: fmt.Errorf("sunspots"),
},
{
name: "checktx",
checkTx: true,
tx: makeTestTx(&swingtypes.MsgDeliverInbound{}),
inboundLimit: 10,
mempoolLimit: 5,
actionQueueLength: 7,
errMsg: ErrInboundQueueFull.Error(),
name: "checktx",
checkTx: true,
tx: makeTestTx(&swingtypes.MsgDeliverInbound{}),
inboundLimit: 10,
mempoolLimit: 5,
inboundQueueLength: 7,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "empty-queue-allowed",
Expand All @@ -87,32 +89,56 @@ func TestInboundAnteHandle(t *testing.T) {
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "already-full",
tx: makeTestTx(&swingtypes.MsgProvision{}),
inboundLimit: 10,
actionQueueLength: 10,
errMsg: ErrInboundQueueFull.Error(),
name: "already-full",
tx: makeTestTx(&swingtypes.MsgProvision{}),
inboundLimit: 10,
inboundQueueLength: 10,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "max-per-tx",
tx: makeTestTx(&swingtypes.MsgWalletAction{}, &swingtypes.MsgWalletSpendAction{}),
inboundLimit: 10,
actionQueueLength: 5,
errMsg: ErrInboundQueueFull.Error(),
name: "max-per-tx",
tx: makeTestTx(&swingtypes.MsgWalletAction{}, &swingtypes.MsgWalletSpendAction{}),
inboundLimit: 10,
inboundQueueLength: 5,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "priority-limit-bypass",
tx: makeTestTx(&swingtypes.MsgWalletSpendAction{}),
isHighPriorityOwner: true,
},
{
name: "priority-multi-bypass",
tx: makeTestTx(&swingtypes.MsgWalletSpendAction{}, &swingtypes.MsgWalletSpendAction{}),
isHighPriorityOwner: true,
},
{
name: "mixed-priority-limit-first-fail",
tx: makeTestTx(&swingtypes.MsgWalletSpendAction{}, &swingtypes.MsgProvision{}),
isHighPriorityOwner: true,
inboundLimit: 1,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "mixed-priority-limit-last-succeed",
tx: makeTestTx(&swingtypes.MsgProvision{}, &swingtypes.MsgWalletSpendAction{}),
isHighPriorityOwner: true,
inboundLimit: 1,
},
} {
t.Run(tt.name, func(t *testing.T) {
ctx := sdk.Context{}.WithIsCheckTx(tt.checkTx)
ctx := sdk.Context{}.WithContext(context.Background()).WithIsCheckTx(tt.checkTx)
emptyQueueAllowed := false
if tt.inboundLimit == -1 {
emptyQueueAllowed = true
}
mock := mockSwingsetKeeper{
actionQueueLength: tt.actionQueueLength,
actionQueueLengthErr: tt.actionQueueLengthErr,
inboundLimit: tt.inboundLimit,
mempoolLimit: tt.mempoolLimit,
emptyQueueAllowed: emptyQueueAllowed,
inboundQueueLength: tt.inboundQueueLength,
inboundQueueLengthErr: tt.inboundQueueLengthErr,
inboundLimit: tt.inboundLimit,
mempoolLimit: tt.mempoolLimit,
emptyQueueAllowed: emptyQueueAllowed,
isHighPriorityOwner: tt.isHighPriorityOwner,
}
decorator := NewInboundDecorator(mock)
newCtx, err := decorator.AnteHandle(ctx, tt.tx, tt.simulate, nilAnteHandler)
Expand Down Expand Up @@ -153,17 +179,19 @@ func nilAnteHandler(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Conte
}

type mockSwingsetKeeper struct {
actionQueueLength int32
actionQueueLengthErr error
inboundLimit int32
mempoolLimit int32
emptyQueueAllowed bool
inboundQueueLength int32
inboundQueueLengthErr error
inboundLimit int32
mempoolLimit int32
emptyQueueAllowed bool
isHighPriorityOwner bool
}

var _ SwingsetKeeper = mockSwingsetKeeper{}
var _ swingtypes.SwingSetKeeper = mockSwingsetKeeper{}

func (msk mockSwingsetKeeper) ActionQueueLength(ctx sdk.Context) (int32, error) {
return msk.actionQueueLength, msk.actionQueueLengthErr
func (msk mockSwingsetKeeper) InboundQueueLength(ctx sdk.Context) (int32, error) {
return msk.inboundQueueLength, msk.inboundQueueLengthErr
}

func (msk mockSwingsetKeeper) GetState(ctx sdk.Context) swingtypes.State {
Expand All @@ -177,3 +205,15 @@ func (msk mockSwingsetKeeper) GetState(ctx sdk.Context) swingtypes.State {
},
}
}

func (msk mockSwingsetKeeper) IsHighPriorityAddress(ctx sdk.Context, addr sdk.AccAddress) (bool, error) {
return msk.isHighPriorityOwner, nil
}

func (msk mockSwingsetKeeper) GetBeansPerUnit(ctx sdk.Context) map[string]sdk.Uint {
return nil
}

func (msk mockSwingsetKeeper) ChargeBeans(ctx sdk.Context, addr sdk.AccAddress, beans sdk.Uint) error {
return fmt.Errorf("not implemented")
}
5 changes: 5 additions & 0 deletions golang/cosmos/vm/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type ControllerAdmissionMsg interface {
// GetInboundMsgCount returns the number of Swingset messages which will
// be added to the inboundQueue.
GetInboundMsgCount() int32

// IsHighPriority returns whether the message should be considered for
// high priority processing, including bypass of some inbound checks
// and queueing on higher priority queues.
IsHighPriority(sdk.Context, interface{}) (bool, error)
}

// Jsonable is a value, j, that can be passed through json.Marshal(j).
Expand Down
Loading