Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename TriggerMark -> TriggerMetadata #68

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (n *Engine) AggregateChecksResult(address string, payload *avsproto.NotifyT
n.logger.Info("processed aggregator check hit", "operator", address, "task_id", payload.TaskId)
n.lock.Unlock()

data, err := json.Marshal(payload.TriggerMarker)
data, err := json.Marshal(payload.TriggerMetadata)
if err != nil {
n.logger.Error("error serialize trigger to json", err)
return err
Expand Down Expand Up @@ -521,16 +521,16 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

data, err := json.Marshal(payload.TriggerMark)
data, err := json.Marshal(payload.TriggerMetadata)
if err != nil {
n.logger.Error("error serialize trigger to json", err)
return nil, status.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
}

if payload.RunInline {
if payload.IsBlocking {
// Run the task inline, by pass the queue system
executor := NewExecutor(n.db, n.logger)
execution, err := executor.RunTask(task, payload.TriggerMark)
execution, err := executor.RunTask(task, payload.TriggerMetadata)
if err == nil {
return &avsproto.UserTriggerTaskResp{
Result: true,
Expand Down
28 changes: 14 additions & 14 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {
return fmt.Errorf("fail to load task: %s", job.Name)
}

triggerMark := &avsproto.TriggerMark{}
triggerMetadata := &avsproto.TriggerMetadata{}
// A task executor data is the trigger mark
// ref: AggregateChecksResult
err = json.Unmarshal(job.Data, triggerMark)
err = json.Unmarshal(job.Data, triggerMetadata)
if err != nil {
return fmt.Errorf("error decode job payload when executing task: %s with job id %d", task.Id, job.ID)
}

_, err = x.RunTask(task, triggerMark)
_, err = x.RunTask(task, triggerMetadata)
return err
}

func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMark) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMark, task.Nodes, task.Edges)
func (x *TaskExecutor) RunTask(task *model.Task, triggerMetadata *avsproto.TriggerMetadata) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges)

if err != nil {
return nil, fmt.Errorf("vm failed to initialize: %w", err)
Expand All @@ -98,17 +98,17 @@ func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMa
}

execution := &avsproto.Execution{
Id: ulid.Make().String(),
StartAt: t0.Unix(),
EndAt: t1.Unix(),
Success: err == nil,
Error: "",
Steps: vm.ExecutionLogs,
TriggerMark: triggerMark,
Id: ulid.Make().String(),
StartAt: t0.Unix(),
EndAt: t1.Unix(),
Success: err == nil,
Error: "",
Steps: vm.ExecutionLogs,
TriggerMetadata: triggerMetadata,
}

if runTaskErr != nil {
x.logger.Error("error executing task", "error", err, "task_id", task.Id, "triggermark", triggerMark)
x.logger.Error("error executing task", "error", err, "task_id", task.Id, "triggermark", triggerMetadata)
execution.Error = runTaskErr.Error()
}

Expand All @@ -129,7 +129,7 @@ func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMa
}

if runTaskErr == nil {
x.logger.Info("succesfully executing task", "task_id", task.Id, "triggermark", triggerMark)
x.logger.Info("succesfully executing task", "task_id", task.Id, "triggermark", triggerMetadata)
return execution, nil
}
return execution, fmt.Errorf("Error executing task %s %v", task.Id, runTaskErr)
Expand Down
8 changes: 4 additions & 4 deletions core/taskengine/trigger/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)

type TriggerMark[T any] struct {
type TriggerMetadata[T any] struct {
TaskID string

Marker T
Expand All @@ -25,10 +25,10 @@ type BlockTrigger struct {
schedule map[int64]map[string]bool

// channel that we will push the trigger information back
triggerCh chan TriggerMark[int64]
triggerCh chan TriggerMetadata[int64]
}

func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMark[int64]) *BlockTrigger {
func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64]) *BlockTrigger {
var err error

logger, err := sdklogging.NewZapLogger(sdklogging.Production)
Expand Down Expand Up @@ -113,7 +113,7 @@ func (b *BlockTrigger) Run(ctx context.Context) error {
z := new(big.Int)
if z.Mod(header.Number, big.NewInt(int64(interval))).Cmp(zero) == 0 {
for taskID, _ := range tasks {
b.triggerCh <- TriggerMark[int64]{
b.triggerCh <- TriggerMetadata[int64]{
TaskID: taskID,
Marker: header.Number.Int64(),
}
Expand Down
6 changes: 3 additions & 3 deletions core/taskengine/trigger/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type EventTrigger struct {
checks sync.Map

// channel that we will push the trigger information back
triggerCh chan TriggerMark[EventMark]
triggerCh chan TriggerMetadata[EventMark]
}

func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMark[EventMark]) *EventTrigger {
func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark]) *EventTrigger {
var err error

logger, err := sdklogging.NewZapLogger(sdklogging.Production)
Expand Down Expand Up @@ -135,7 +135,7 @@ func (evt *EventTrigger) Run(ctx context.Context) error {
check := value.(*Check)
if hit, err := evt.Evaluate(&event, check.Program); err == nil && hit {
evt.logger.Info("check hit, notify aggregator", "task_id", key)
evt.triggerCh <- TriggerMark[EventMark]{
evt.triggerCh <- TriggerMetadata[EventMark]{
TaskID: key.(string),
Marker: EventMark{
BlockNumber: event.BlockNumber,
Expand Down
2 changes: 1 addition & 1 deletion core/taskengine/trigger/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestChainlinkLatestAnswer(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMark[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000))

envs := macros.GetEnvs(map[string]interface{}{
"trigger1": map[string]interface{}{
Expand Down
12 changes: 6 additions & 6 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (v *VM) Reset() {
v.instructionCount = 0
}

func NewVMWithData(taskID string, triggerMark *avsproto.TriggerMark, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) {
func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) {
v := &VM{
Status: VMStateInitialize,
TaskEdges: edges,
Expand All @@ -92,24 +92,24 @@ func NewVMWithData(taskID string, triggerMark *avsproto.TriggerMark, nodes []*av
v.vars = macros.GetEnvs(map[string]any{})

// popular trigger data for trigger variable
if triggerMark != nil && triggerMark.LogIndex > 0 && triggerMark.TxHash != "" {
if triggerMetadata != nil && triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" {
// if it contains event, we need to fetch and pop
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMark.TxHash))
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash))
if err != nil {
return nil, err
}

var event *types.Log
//event := receipt.Logs[triggerMark.LogIndex]
//event := receipt.Logs[triggerMetadata.LogIndex]

for _, l := range receipt.Logs {
if uint64(l.Index) == triggerMark.LogIndex {
if uint64(l.Index) == triggerMetadata.LogIndex {
event = l
}
}

if event == nil {
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMark.TxHash, triggerMark.LogIndex)
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex)
}

tokenMetadata, err := GetMetadataForTransfer(event)
Expand Down
2 changes: 1 addition & 1 deletion core/taskengine/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestEvaluateEvent(t *testing.T) {
},
}

mark := avsproto.TriggerMark{
mark := avsproto.TriggerMetadata{
BlockNumber: 7212417,
TxHash: "0x53beb2163994510e0984b436ebc828dc57e480ee671cfbe7ed52776c2a4830c8",
LogIndex: 98,
Expand Down
2 changes: 1 addition & 1 deletion examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async function triggerTask(owner, token, taskId, triggerMark) {
"TriggerTask",
// If want to run async, comment this line out
//{ task_id: taskId, triggerMark, },
{ task_id: taskId, triggerMark, run_inline: true },
{ task_id: taskId, triggerMark, is_blocking: true },
metadata
);

Expand Down
46 changes: 23 additions & 23 deletions examples/static_codegen/avs_grpc_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,37 @@ function deserialize_aggregator_CreateTaskResp(buffer_arg) {
return avs_pb.CreateTaskResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_CreateWalletReq(arg) {
if (!(arg instanceof avs_pb.CreateWalletReq)) {
throw new Error('Expected argument of type aggregator.CreateWalletReq');
function serialize_aggregator_GetKeyReq(arg) {
if (!(arg instanceof avs_pb.GetKeyReq)) {
throw new Error('Expected argument of type aggregator.GetKeyReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_CreateWalletReq(buffer_arg) {
return avs_pb.CreateWalletReq.deserializeBinary(new Uint8Array(buffer_arg));
function deserialize_aggregator_GetKeyReq(buffer_arg) {
return avs_pb.GetKeyReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_CreateWalletResp(arg) {
if (!(arg instanceof avs_pb.CreateWalletResp)) {
throw new Error('Expected argument of type aggregator.CreateWalletResp');
function serialize_aggregator_GetWalletReq(arg) {
if (!(arg instanceof avs_pb.GetWalletReq)) {
throw new Error('Expected argument of type aggregator.GetWalletReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_CreateWalletResp(buffer_arg) {
return avs_pb.CreateWalletResp.deserializeBinary(new Uint8Array(buffer_arg));
function deserialize_aggregator_GetWalletReq(buffer_arg) {
return avs_pb.GetWalletReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_GetKeyReq(arg) {
if (!(arg instanceof avs_pb.GetKeyReq)) {
throw new Error('Expected argument of type aggregator.GetKeyReq');
function serialize_aggregator_GetWalletResp(arg) {
if (!(arg instanceof avs_pb.GetWalletResp)) {
throw new Error('Expected argument of type aggregator.GetWalletResp');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_GetKeyReq(buffer_arg) {
return avs_pb.GetKeyReq.deserializeBinary(new Uint8Array(buffer_arg));
function deserialize_aggregator_GetWalletResp(buffer_arg) {
return avs_pb.GetWalletResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_IdReq(arg) {
Expand Down Expand Up @@ -240,16 +240,16 @@ getNonce: {
responseSerialize: serialize_aggregator_NonceResp,
responseDeserialize: deserialize_aggregator_NonceResp,
},
createWallet: {
path: '/aggregator.Aggregator/CreateWallet',
getWallet: {
path: '/aggregator.Aggregator/GetWallet',
requestStream: false,
responseStream: false,
requestType: avs_pb.CreateWalletReq,
responseType: avs_pb.CreateWalletResp,
requestSerialize: serialize_aggregator_CreateWalletReq,
requestDeserialize: deserialize_aggregator_CreateWalletReq,
responseSerialize: serialize_aggregator_CreateWalletResp,
responseDeserialize: deserialize_aggregator_CreateWalletResp,
requestType: avs_pb.GetWalletReq,
responseType: avs_pb.GetWalletResp,
requestSerialize: serialize_aggregator_GetWalletReq,
requestDeserialize: deserialize_aggregator_GetWalletReq,
responseSerialize: serialize_aggregator_GetWalletResp,
responseDeserialize: deserialize_aggregator_GetWalletResp,
},
listWallets: {
path: '/aggregator.Aggregator/ListWallets',
Expand Down
Loading
Loading