Skip to content

Commit

Permalink
start passing the variable to next step
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Dec 12, 2024
1 parent c52713d commit 8c7bc1c
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {
}

func (x *TaskExecutor) RunTask(task *model.Task, triggerMetadata *avsproto.TriggerMetadata) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges)
vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges).WithLogger(x.logger)

if err != nil {
return nil, fmt.Errorf("vm failed to initialize: %w", err)
Expand Down
42 changes: 37 additions & 5 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"sync"
"time"

sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/dop251/goja"
"github.com/ginkgoch/godash/v2"
"github.com/pingcap/log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -55,6 +57,8 @@ type VM struct {
plans map[string]*Step
entrypoint string
instructionCount int64

logger sdklogging.Logger
}

func NewVM() (*VM, error) {
Expand All @@ -75,6 +79,11 @@ func (v *VM) Reset() {
v.instructionCount = 0
}

func (v *VM) GetNodeNameAsVar(nodeID string) string {
name := v.TaskNodes[nodeID].Name
return name
}

func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) {
v := &VM{
Status: VMStateInitialize,
Expand Down Expand Up @@ -247,23 +256,29 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err
p := NewRestProrcessor()

// only evaluate string when there is string interpolation
if nodeValue.Body != "" && strings.Contains(nodeValue.Body, "$") {
if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) {
nodeValue2 := &avsproto.RestAPINode{
Url: macros.RenderString(nodeValue.Url, macroEnvs),
Headers: nodeValue.Headers,
Method: nodeValue.Method,
Body: strings.Clone(nodeValue.Body),
}
vm := goja.New()
// TODO: dynamically set var instead of hardcode the name
// client would need to send this over
vm.Set("trigger1", v.vars["trigger1"])

for key, value := range v.vars {
vm.Set(key, map[string]any{
"data": value,
})
}

renderBody, err := vm.RunString(nodeValue.Body)
if err == nil {
nodeValue2.Body = renderBody.Export().(string)
} else {
fmt.Println("error render string with goja", err)
}
executionLog, err = p.Execute(node.Id, nodeValue2)
executionLog, result, err = p.Execute(node.Id, nodeValue2)
v.vars[v.GetNodeNameAsVar(stepID)] = result
} else {
executionLog, err = p.Execute(node.Id, nodeValue)
}
Expand All @@ -289,11 +304,28 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err
}
}
}
} else if nodeValue := node.GetGraphqlQuery(); nodeValue != nil {
executionLog, err = v.runGraphQL(node.Id, nodeValue)
}

return executionLog, err
}

func (v *VM) runGraphQL(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) {
g, err := NewGraphqlQueryProcessor(node.Url)
if err != nil {
return nil, err
}
executionLog, result, err := g.Execute(stepID, node)
v.vars[v.GetNodeNameAsVar(stepID)] = result
if err != nil {
log.Error("error execute graphql node", "task_id", v.TaskID, "step", node.Id, "url", node.Url, "error", err)
}
v.ExecutionLogs = append(v.ExecutionLogs, executionLog)

return executionLog, nil
}

func (v *VM) runBranch(stepID string, node *avsproto.BranchNode) (*avsproto.Execution_Step, string, error) {
t0 := time.Now()
s := &avsproto.Execution_Step{
Expand Down
6 changes: 3 additions & 3 deletions core/taskengine/vm_runner_graphql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewGraphqlQueryProcessor(endpoint string) (*GraphqlQueryProcessor, error) {
}, nil
}

func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) {
func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, any, error) {
ctx := context.Background()
t0 := time.Now().Unix()
step := &avsproto.Execution_Step{
Expand All @@ -68,11 +68,11 @@ func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQue
query := graphql.NewRequest(node.Query)
err = r.client.Run(ctx, query, &resp)
if err != nil {
return step, err
return step, nil, err
}

step.Log = r.sb.String()
data, err := json.Marshal(resp)
step.OutputData = string(data)
return step, err
return step, resp, err
}
100 changes: 99 additions & 1 deletion examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ const main = async (cmd) => {
case "schedule-monitor":
scheduleMonitor(owner, token, process.argv[3]);
break;
case "schedule-aave":
scheduleAaveMonitor(owner, token);
break;
case "schedule":
case "schedule-cron":
case "schedule-event":
Expand Down Expand Up @@ -370,7 +373,6 @@ const main = async (cmd) => {
case "delete":
await deleteTask(owner, token, process.argv[3]);
break;

case "wallet":
await getWallets(owner, token);
break;
Expand Down Expand Up @@ -398,6 +400,7 @@ const main = async (cmd) => {
schedule-cron <smart-wallet-address>: to schedule a task that run on cron
schedule-event <smart-wallet-address>: to schedule a task that run on occurenct of an event
schedule-generic: to schedule a task with an arbitrary contract query
schedule-aave: monitor and report aavee liquidity rate every block
monitor-address <wallet-address>: to monitor erc20 in/out for an address
trigger <task-id> <trigger-mark>: manually trigger a task. Example:
trigger abcdef '{"block_number":1234}' for blog trigger
Expand Down Expand Up @@ -658,6 +661,101 @@ async function scheduleMonitor(owner, token, target) {
return result;
}

// setup a task to monitor in/out transfer for a wallet and send notification
async function scheduleAaveMonitor(owner, token) {
const wallets = await getWallets(owner, token);
const smartWalletAddress = wallets[0].address;

const metadata = new grpc.Metadata();
metadata.add("authkey", token);

let trigger = {
name: "trigger1",
block: {
interval: 1,
},
};

const getReserveId = UlidMonotonic.generate().toCanonical();
const sendSummaryId = UlidMonotonic.generate().toCanonical();

const result = await asyncRPC(
client,
"CreateTask",
{
smart_wallet_address: smartWalletAddress,
nodes: [
{
id: getReserveId,
name: 'getReserveUSDC',
graphql_query: {
url: 'https://gateway.thegraph.com/api/10186dcf11921c7d1bc140721c69da38/subgraphs/id/Cd2gEDVeqnjBn1hSeqFMitw8Q1iiyV9FYUZkLNRcL87g',
query: `
{
reserves(where: {underlyingAsset: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"}) {
id
underlyingAsset
name
decimals
liquidityRate
aToken {
id
}
sToken {
id
}
}
}
`
}
},
{
id: sendSummaryId,
name: 'notification',
rest_api: {
url: "https://api.telegram.org/bot{{notify_bot_token}}/sendMessage?parse_mode=MarkdownV2",
//url: `https://webhook.site/ca416047-5ba0-4485-8f98-76790b63add7`,
method: "POST",
body: `JSON.stringify({
chat_id:-4609037622,
text: \`Current USDC liquidity rate in RAY unit is \${getReserveUSDC.data.reserves[0].liquidityRate} \`
})`,
headers: {
"content-type": "application/json"
}
}
},
],

edges: [
{
id: UlidMonotonic.generate().toCanonical(),
// __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint
source: "__TRIGGER__",
target: getReserveId,
},
{
id: UlidMonotonic.generate().toCanonical(),
// __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint
source: getReserveId,
target: sendSummaryId,
},
],

trigger,
start_at: Math.floor(Date.now() / 1000) + 30,
expired_at: Math.floor(Date.now() / 1000 + 3600 * 24 * 30),
memo: `Montoring USDC aavee on ethereum`,
},
metadata
);

console.log("create task", result);

return result;
}


(async () => {
try {
main(process.argv[2]);
Expand Down

0 comments on commit 8c7bc1c

Please sign in to comment.