Skip to content

Commit

Permalink
make execution_stage field an array of stages
Browse files Browse the repository at this point in the history
  • Loading branch information
joldie777 committed Aug 25, 2024
1 parent fd5e5e5 commit 5c3d8f1
Show file tree
Hide file tree
Showing 20 changed files with 427 additions and 195 deletions.
6 changes: 6 additions & 0 deletions proto/neutron/cron/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,31 @@ service Query {
// this line is used by starport scaffolding # 2
}

// QueryParamsRequest is the request type for the Query/Params RPC method.
message QueryParamsRequest {}

// QueryParamsResponse is the response type for the Query/Params RPC method.
message QueryParamsResponse {
// params holds all the parameters of this module.
Params params = 1 [(gogoproto.nullable) = false];
}

// QueryGetScheduleRequest is the request type for the Query/Schedule RPC method.
message QueryGetScheduleRequest {
string name = 1;
}

// QueryGetScheduleResponse is the response type for the Query/Params RPC method.
message QueryGetScheduleResponse {
Schedule schedule = 1 [(gogoproto.nullable) = false];
}

// QuerySchedulesRequest is the request type for the Query/Schedules RPC method.
message QuerySchedulesRequest {
cosmos.base.query.v1beta1.PageRequest pagination = 1;
}

// QuerySchedulesResponse is the response type for the Query/Params RPC method.
message QuerySchedulesResponse {
repeated Schedule schedules = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
Expand Down
16 changes: 10 additions & 6 deletions proto/neutron/cron/schedule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,35 @@ option go_package = "github.com/neutron-org/neutron/v4/x/cron/types";

// ExecutionStage defines when messages will be executed in the block
enum ExecutionStage {
BEGIN_BLOCKER = 0;
END_BLOCKER = 1;
BOTH_BLOCKERS = 2;
// Execution at the end of the block
EXECUTION_STAGE_END_BLOCKER = 0;
// Execution at the beginning of the block
EXECUTION_STAGE_BEGIN_BLOCKER = 1;
}

// Schedule defines the schedule for execution
message Schedule {
// Name of schedule
string name = 1;
// Period in blocks
uint64 period = 2;
// Msgs that will be executed every period amount of time
// Msgs that will be executed every certain number of blocks, specified in the `period` field
repeated MsgExecuteContract msgs = 3 [(gogoproto.nullable) = false];
// Last execution's block height
uint64 last_execute_height = 4;
// Execution stage when messages will be executed
ExecutionStage execution_stage = 5;
// Execution stages when messages will be executed
repeated ExecutionStage execution_stages = 5 [(gogoproto.nullable) = false];
}

// MsgExecuteContract defines the contract and the message to pass
message MsgExecuteContract {
// Contract is the address of the smart contract
string contract = 1;
// Msg is json encoded message to be passed to the contract
string msg = 2;
}

// ScheduleCount defines the number of current schedules
message ScheduleCount {
// Count is the number of current schedules
int32 count = 1;
Expand Down
9 changes: 6 additions & 3 deletions proto/neutron/cron/tx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ message MsgAddSchedule {

// Authority is the address of the governance account.
string authority = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];

// Name of the schedule
string name = 2;
// Period in blocks
uint64 period = 3;
// Msgs that will be executed every certain number of blocks, specified in the `period` field
repeated MsgExecuteContract msgs = 4 [(gogoproto.nullable) = false];
ExecutionStage execution_stage = 5;
// Execution stages when messages will be executed
repeated ExecutionStage execution_stages = 5 [(gogoproto.nullable) = false];
}

// MsgAddScheduleResponse defines the response structure for executing a
Expand All @@ -50,7 +53,7 @@ message MsgRemoveSchedule {

// Authority is the address of the governance account.
string authority = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];

// Name of the schedule
string name = 2;
}

Expand Down
5 changes: 4 additions & 1 deletion proto/neutron/cron/v1/schedule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ import "gogoproto/gogo.proto";

option go_package = "github.com/neutron-org/neutron/v4/x/cron/types/v1";

// Schedule defines the schedule for execution
message Schedule {
// Name of schedule
string name = 1;
// Period in blocks
uint64 period = 2;
// Msgs that will be executed every period amount of time
// Msgs that will be executed every certain number of blocks, specified in the `period` field
repeated MsgExecuteContract msgs = 3 [(gogoproto.nullable) = false];
// Last execution's block height
uint64 last_execute_height = 4;
}

// MsgExecuteContract defines the contract and the message to pass
message MsgExecuteContract {
// Contract is the address of the smart contract
string contract = 1;
// Msg is json encoded message to be passed to the contract
string msg = 2;
}

// ScheduleCount defines the number of current schedules
message ScheduleCount {
// Count is the number of current schedules
int32 count = 1;
Expand Down
8 changes: 4 additions & 4 deletions wasmbinding/bindings/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ type ForceTransfer struct {

// AddSchedule adds new schedule to the cron module
type AddSchedule struct {
Name string `json:"name"`
Period uint64 `json:"period"`
Msgs []MsgExecuteContract `json:"msgs"`
ExecutionStage string `json:"execution_stage"`
Name string `json:"name"`
Period uint64 `json:"period"`
Msgs []MsgExecuteContract `json:"msgs"`
ExecutionStages []string `json:"execution_stages"`
}

// AddScheduleResponse holds response AddSchedule
Expand Down
11 changes: 4 additions & 7 deletions wasmbinding/message_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,15 +985,12 @@ func (m *CustomMessenger) addSchedule(ctx sdk.Context, contractAddr sdk.AccAddre
})
}

var executionStage crontypes.ExecutionStage

if v, ok := crontypes.ExecutionStage_value[addSchedule.ExecutionStage]; !ok {
executionStage = crontypes.ExecutionStage_END_BLOCKER
} else {
executionStage = crontypes.ExecutionStage(v)
executionStages := make([]crontypes.ExecutionStage, 0, len(addSchedule.ExecutionStages))
for _, executionStage := range addSchedule.ExecutionStages {
executionStages = append(executionStages, crontypes.ExecutionStage(crontypes.ExecutionStage_value[executionStage]))
}

err := m.CronKeeper.AddSchedule(ctx, addSchedule.Name, addSchedule.Period, msgs, executionStage)
err := m.CronKeeper.AddSchedule(ctx, addSchedule.Name, addSchedule.Period, msgs, executionStages)
if err != nil {
ctx.Logger().Error("failed to addSchedule",
"from_address", contractAddr.String(),
Expand Down
2 changes: 1 addition & 1 deletion x/cron/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func InitGenesis(ctx sdk.Context, k keeper.Keeper, genState types.GenesisState) {
// Set all the schedules
for _, elem := range genState.ScheduleList {
err := k.AddSchedule(ctx, elem.Name, elem.Period, elem.Msgs, elem.ExecutionStage)
err := k.AddSchedule(ctx, elem.Name, elem.Period, elem.Msgs, elem.ExecutionStages)
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion x/cron/keeper/grpc_query_schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func createNSchedule(t *testing.T, ctx sdk.Context, k *cronkeeper.Keeper, n int3
item.Period = 1000
item.Msgs = nil
item.LastExecuteHeight = uint64(ctx.BlockHeight())
item.ExecutionStages = []types.ExecutionStage{types.ExecutionStage_EXECUTION_STAGE_END_BLOCKER}

err := k.AddSchedule(ctx, item.Name, item.Period, item.Msgs, item.ExecutionStage)
err := k.AddSchedule(ctx, item.Name, item.Period, item.Msgs, item.ExecutionStages)
require.NoError(t, err)

res[idx] = item
Expand Down
36 changes: 23 additions & 13 deletions x/cron/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (

MetricLabelSuccess = "success"
MetricLabelScheduleName = "schedule_name"

schedulesExecutionStages map[string]map[types.ExecutionStage]struct{}
)

type (
Expand All @@ -47,6 +49,8 @@ func NewKeeper(
accountKeeper types.AccountKeeper,
authority string,
) *Keeper {
schedulesExecutionStages = make(map[string]map[types.ExecutionStage]struct{})

return &Keeper{
cdc: cdc,
storeKey: storeKey,
Expand All @@ -66,42 +70,50 @@ func (k *Keeper) Logger(ctx sdk.Context) log.Logger {

// ExecuteReadySchedules gets all schedules that are due for execution (with limit that is equal to Params.Limit)
// and executes messages in each one
// NOTE that errors in contract calls rollback all already executed messages
func (k *Keeper) ExecuteReadySchedules(ctx sdk.Context, executionStage types.ExecutionStage) {
telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), LabelExecuteReadySchedules)
schedules := k.getSchedulesReadyForExecution(ctx)

for _, schedule := range schedules {
if isExecutableAtTheStage(schedule, executionStage) {
if _, ok := schedulesExecutionStages[schedule.Name][executionStage]; ok {
err := k.executeSchedule(ctx, schedule)
recordExecutedSchedule(err, schedule)
}
}
}

// AddSchedule adds new schedule to execution for every block `period`.
// AddSchedule adds a new schedule to be executed every certain number of blocks, specified in the `period`.
// First schedule execution is supposed to be on `now + period` block.
func (k *Keeper) AddSchedule(
ctx sdk.Context,
name string,
period uint64,
msgs []types.MsgExecuteContract,
executionStage types.ExecutionStage,
executionStages []types.ExecutionStage,
) error {
if k.scheduleExists(ctx, name) {
return fmt.Errorf("schedule already exists with name=%v", name)
}

schedulesExecutionStages[name] = make(map[types.ExecutionStage]struct{})
execStages := make([]types.ExecutionStage, 0)
for _, executionStage := range executionStages {
if _, ok := types.ExecutionStage_name[int32(executionStage)]; !ok {
executionStage = types.ExecutionStage_EXECUTION_STAGE_END_BLOCKER
}

if _, ok := schedulesExecutionStages[name][executionStage]; !ok {
schedulesExecutionStages[name][executionStage] = struct{}{}
execStages = append(execStages, executionStage)
}
}

schedule := types.Schedule{
Name: name,
Period: period,
Msgs: msgs,
LastExecuteHeight: uint64(ctx.BlockHeight()), // let's execute newly added schedule on `now + period` block
ExecutionStage: executionStage,
}

if _, ok := types.ExecutionStage_name[int32(executionStage)]; !ok {
schedule.ExecutionStage = types.ExecutionStage_END_BLOCKER
ExecutionStages: execStages,
}

k.storeSchedule(ctx, schedule)
Expand All @@ -116,6 +128,8 @@ func (k *Keeper) RemoveSchedule(ctx sdk.Context, name string) {
return
}

delete(schedulesExecutionStages, name)

k.changeTotalCount(ctx, -1)
k.removeSchedule(ctx, name)
}
Expand Down Expand Up @@ -183,10 +197,6 @@ func (k *Keeper) getSchedulesReadyForExecution(ctx sdk.Context) []types.Schedule
return res
}

func isExecutableAtTheStage(schedule types.Schedule, executionStage types.ExecutionStage) bool {
return schedule.ExecutionStage == executionStage || schedule.ExecutionStage == types.ExecutionStage_BOTH_BLOCKERS
}

// executeSchedule executes all msgs in a given schedule and changes LastExecuteHeight
// if at least one msg execution fails, rollback all messages
func (k *Keeper) executeSchedule(ctx sdk.Context, schedule types.Schedule) error {
Expand Down
Loading

0 comments on commit 5c3d8f1

Please sign in to comment.