diff --git a/execution/execution.go b/execution/execution.go index 9dc835d88..26952af75 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -41,6 +41,7 @@ type Execution struct { Status Status `hash:"-"` InstanceHash hash.Hash `hash:"name:instanceHash"` TaskKey string `hash:"name:taskKey"` + StepID string `hash:"name:stepID"` Tags []string `hash:"name:tags"` Inputs map[string]interface{} `hash:"name:inputs"` Outputs map[string]interface{} `hash:"-"` @@ -48,7 +49,7 @@ type Execution struct { } // New returns a new execution. It returns an error if inputs are invalid. -func New(workflowHash, instanceHash, parentHash, eventHash hash.Hash, taskKey string, inputs map[string]interface{}, tags []string) *Execution { +func New(workflowHash, instanceHash, parentHash, eventHash hash.Hash, stepID string, taskKey string, inputs map[string]interface{}, tags []string) *Execution { exec := &Execution{ WorkflowHash: workflowHash, EventHash: eventHash, @@ -56,6 +57,7 @@ func New(workflowHash, instanceHash, parentHash, eventHash hash.Hash, taskKey st ParentHash: parentHash, Inputs: inputs, TaskKey: taskKey, + StepID: stepID, Tags: tags, Status: Created, } diff --git a/execution/execution_test.go b/execution/execution_test.go index 8db09e1e2..7a7132584 100644 --- a/execution/execution_test.go +++ b/execution/execution_test.go @@ -18,7 +18,7 @@ func TestNewFromService(t *testing.T) { tags = []string{"tag"} ) - execution := New(nil, hash, parentHash, eventHash, taskKey, nil, tags) + execution := New(nil, hash, parentHash, eventHash, "", taskKey, nil, tags) require.NotNil(t, execution) require.Equal(t, hash, execution.InstanceHash) require.Equal(t, parentHash, execution.ParentHash) @@ -30,7 +30,7 @@ func TestNewFromService(t *testing.T) { } func TestExecute(t *testing.T) { - e := New(nil, nil, nil, nil, "", nil, nil) + e := New(nil, nil, nil, nil, "", "", nil, nil) require.NoError(t, e.Execute()) require.Equal(t, InProgress, e.Status) require.Error(t, e.Execute()) @@ -38,7 +38,7 @@ func TestExecute(t *testing.T) { func TestComplete(t *testing.T) { output := map[string]interface{}{"foo": "bar"} - e := New(nil, nil, nil, nil, "", nil, nil) + e := New(nil, nil, nil, nil, "", "", nil, nil) e.Execute() require.NoError(t, e.Complete(output)) @@ -49,7 +49,7 @@ func TestComplete(t *testing.T) { func TestFailed(t *testing.T) { err := errors.New("test") - e := New(nil, nil, nil, nil, "", nil, nil) + e := New(nil, nil, nil, nil, "", "", nil, nil) e.Execute() require.NoError(t, e.Failed(err)) require.Equal(t, Failed, e.Status) @@ -68,7 +68,7 @@ func TestExecutionHash(t *testing.T) { ids := make(map[string]bool) f := func(instanceHash, parentHash, eventID []byte, taskKey, input string, tags []string) bool { - e := New(nil, instanceHash, parentHash, eventID, taskKey, map[string]interface{}{"input": input}, tags) + e := New(nil, instanceHash, parentHash, eventID, "", taskKey, map[string]interface{}{"input": input}, tags) if ids[string(e.Hash)] { return false } diff --git a/protobuf/api/workflow.pb.go b/protobuf/api/workflow.pb.go index 75c512a7a..a8e6ce3bc 100644 --- a/protobuf/api/workflow.pb.go +++ b/protobuf/api/workflow.pb.go @@ -27,7 +27,8 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type CreateWorkflowRequest struct { Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` Trigger *types.Workflow_Trigger `protobuf:"bytes,3,opt,name=trigger,proto3" json:"trigger,omitempty"` - Tasks []*types.Workflow_Task `protobuf:"bytes,4,rep,name=tasks,proto3" json:"tasks,omitempty"` + Nodes []*types.Workflow_Node `protobuf:"bytes,4,rep,name=nodes,proto3" json:"nodes,omitempty"` + Edges []*types.Workflow_Edge `protobuf:"bytes,5,rep,name=edges,proto3" json:"edges,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -72,9 +73,16 @@ func (m *CreateWorkflowRequest) GetTrigger() *types.Workflow_Trigger { return nil } -func (m *CreateWorkflowRequest) GetTasks() []*types.Workflow_Task { +func (m *CreateWorkflowRequest) GetNodes() []*types.Workflow_Node { if m != nil { - return m.Tasks + return m.Nodes + } + return nil +} + +func (m *CreateWorkflowRequest) GetEdges() []*types.Workflow_Edge { + if m != nil { + return m.Edges } return nil } @@ -320,28 +328,29 @@ func init() { func init() { proto.RegisterFile("protobuf/api/workflow.proto", fileDescriptor_2568c098005b7b27) } var fileDescriptor_2568c098005b7b27 = []byte{ - // 325 bytes of a gzipped FileDescriptorProto + // 342 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x4d, 0x4f, 0xfa, 0x40, - 0x10, 0xc6, 0x29, 0xe5, 0xcf, 0x5f, 0x86, 0x83, 0x66, 0xe4, 0x65, 0x2d, 0x31, 0x21, 0x3d, 0x35, - 0xbe, 0x2c, 0x01, 0xcf, 0x9e, 0xd0, 0x70, 0xf1, 0xd4, 0x98, 0x78, 0x5e, 0x92, 0x01, 0x9a, 0x12, - 0xbb, 0x76, 0x97, 0x10, 0xbe, 0x80, 0xf1, 0x63, 0x1b, 0xb6, 0xad, 0xc6, 0xed, 0x9a, 0x78, 0x9b, - 0xcc, 0xfc, 0xe6, 0xe9, 0x3c, 0x4f, 0x17, 0x46, 0x32, 0xcf, 0x74, 0xb6, 0xdc, 0xad, 0x26, 0x42, - 0x26, 0x93, 0x7d, 0x96, 0xa7, 0xab, 0x6d, 0xb6, 0xe7, 0xa6, 0x8b, 0xbe, 0x90, 0x49, 0x70, 0xf9, - 0x45, 0xe8, 0x83, 0x24, 0x65, 0x31, 0xe1, 0x87, 0x07, 0xfd, 0x79, 0x4e, 0x42, 0xd3, 0x4b, 0x39, - 0x88, 0xe9, 0x6d, 0x47, 0x4a, 0xe3, 0x19, 0xf8, 0x29, 0x1d, 0x58, 0x73, 0xec, 0x45, 0x9d, 0xf8, - 0x58, 0xe2, 0x14, 0xfe, 0xeb, 0x3c, 0x59, 0xaf, 0x29, 0x67, 0xfe, 0xd8, 0x8b, 0xba, 0xb3, 0x21, - 0x37, 0x9a, 0xbc, 0x5a, 0xe5, 0xcf, 0xc5, 0x38, 0xae, 0x38, 0xbc, 0x82, 0x7f, 0x5a, 0xa8, 0x54, - 0xb1, 0xd6, 0xd8, 0x8f, 0xba, 0xb3, 0x5e, 0x6d, 0x41, 0xa8, 0x34, 0x2e, 0x90, 0xf0, 0x06, 0x06, - 0xf6, 0x25, 0x4a, 0x66, 0xaf, 0x8a, 0x10, 0xa1, 0xb5, 0x11, 0x6a, 0xc3, 0x3c, 0x73, 0x8b, 0xa9, - 0xc3, 0x6b, 0xe8, 0x3f, 0xd0, 0x96, 0xea, 0x77, 0xbb, 0x60, 0x06, 0x03, 0x1b, 0x2e, 0xa4, 0xc3, - 0x08, 0x70, 0x41, 0xfa, 0x2f, 0x1a, 0x7d, 0x38, 0x7f, 0x4a, 0x94, 0x8d, 0x86, 0x8f, 0xd0, 0xfb, - 0xd9, 0x2e, 0x6f, 0xbe, 0x85, 0x4e, 0x15, 0xb5, 0x62, 0x9e, 0x71, 0x7f, 0x6a, 0xb9, 0x8f, 0xbf, - 0x89, 0xd9, 0x7b, 0x13, 0x4e, 0xaa, 0x3e, 0xce, 0xa1, 0x5d, 0x24, 0x81, 0x01, 0x17, 0x32, 0xe1, - 0xce, 0x1f, 0x14, 0x8c, 0x9c, 0xb3, 0xd2, 0x57, 0xe3, 0x28, 0x52, 0x78, 0x2e, 0x45, 0x9c, 0x69, - 0x95, 0x22, 0xbf, 0x84, 0xd3, 0xc0, 0x29, 0xf8, 0x0b, 0xd2, 0x38, 0x34, 0x54, 0x3d, 0xa8, 0xc0, - 0xb6, 0x14, 0x36, 0xf0, 0x1e, 0x5a, 0xc7, 0x40, 0x90, 0x99, 0x1d, 0x47, 0x64, 0xc1, 0x85, 0x63, - 0x52, 0x7d, 0x71, 0xd9, 0x36, 0xef, 0xf2, 0xee, 0x33, 0x00, 0x00, 0xff, 0xff, 0xce, 0x68, 0x11, - 0xba, 0xda, 0x02, 0x00, 0x00, + 0x10, 0xc6, 0x29, 0x05, 0xfe, 0x7f, 0x86, 0x83, 0x66, 0xe4, 0x65, 0x2d, 0x31, 0x21, 0x3d, 0x35, + 0xbe, 0x94, 0x80, 0x67, 0x4f, 0x48, 0xb8, 0x18, 0x0f, 0x8d, 0x89, 0xe7, 0x92, 0x0e, 0xa5, 0x81, + 0xb0, 0xb5, 0xbb, 0x84, 0xf0, 0x05, 0xfc, 0x44, 0x7e, 0x40, 0xd3, 0x6d, 0xab, 0x71, 0x59, 0x13, + 0x6f, 0x9b, 0x99, 0xdf, 0x3c, 0x9d, 0xe7, 0x99, 0xc2, 0x30, 0xcd, 0xb8, 0xe4, 0xcb, 0xfd, 0x6a, + 0x1c, 0xa6, 0xc9, 0xf8, 0xc0, 0xb3, 0xcd, 0x6a, 0xcb, 0x0f, 0xbe, 0xaa, 0xa2, 0x1d, 0xa6, 0x89, + 0x73, 0xf5, 0x45, 0xc8, 0x63, 0x4a, 0x42, 0x63, 0xdc, 0x0f, 0x0b, 0x7a, 0xb3, 0x8c, 0x42, 0x49, + 0xaf, 0x65, 0x23, 0xa0, 0xb7, 0x3d, 0x09, 0x89, 0xe7, 0x60, 0x6f, 0xe8, 0xc8, 0xea, 0x23, 0xcb, + 0x6b, 0x07, 0xf9, 0x13, 0x27, 0xf0, 0x4f, 0x66, 0x49, 0x1c, 0x53, 0xc6, 0xec, 0x91, 0xe5, 0x75, + 0xa6, 0x03, 0x5f, 0x69, 0xfa, 0xd5, 0xa8, 0xff, 0x52, 0xb4, 0x83, 0x8a, 0xc3, 0x6b, 0x68, 0xee, + 0x78, 0x44, 0x82, 0x35, 0x46, 0xb6, 0xd7, 0x99, 0x76, 0xf5, 0x81, 0x67, 0x1e, 0x51, 0x50, 0x20, + 0x39, 0x4b, 0x51, 0x4c, 0x82, 0x35, 0xcd, 0xec, 0x3c, 0x8a, 0x29, 0x28, 0x10, 0xf7, 0x16, 0xfa, + 0xfa, 0xd6, 0x22, 0xe5, 0x3b, 0x41, 0x88, 0xd0, 0x58, 0x87, 0x62, 0xcd, 0x2c, 0xb5, 0xb7, 0x7a, + 0xbb, 0x37, 0xd0, 0x7b, 0xa4, 0x2d, 0x9d, 0x7a, 0x34, 0xc1, 0x0c, 0xfa, 0x3a, 0x5c, 0x48, 0xbb, + 0x1e, 0xe0, 0x82, 0xe4, 0x5f, 0x34, 0x7a, 0x70, 0xf1, 0x94, 0x08, 0x1d, 0x75, 0xe7, 0xd0, 0xfd, + 0x59, 0x2e, 0x77, 0xbe, 0x83, 0x76, 0x75, 0x16, 0xc1, 0x2c, 0xe5, 0xfe, 0x4c, 0x73, 0x1f, 0x7c, + 0x13, 0xd3, 0xf7, 0x3a, 0xfc, 0xaf, 0xea, 0x38, 0x83, 0x56, 0x91, 0x04, 0x3a, 0x7e, 0x98, 0x26, + 0xbe, 0xf1, 0x98, 0xce, 0xd0, 0xd8, 0x2b, 0x7d, 0xd5, 0x72, 0x91, 0xc2, 0x73, 0x29, 0x62, 0x4c, + 0xab, 0x14, 0xf9, 0x25, 0x9c, 0x1a, 0x4e, 0xc0, 0x5e, 0x90, 0xc4, 0x81, 0xa2, 0x4e, 0x83, 0x72, + 0x74, 0x4b, 0x6e, 0x0d, 0x1f, 0xa0, 0x91, 0x07, 0x82, 0x4c, 0xcd, 0x18, 0x22, 0x73, 0x2e, 0x0d, + 0x9d, 0xea, 0x8b, 0xcb, 0x96, 0xfa, 0x87, 0xef, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xd5, 0xfd, + 0xb2, 0x54, 0x06, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/protobuf/api/workflow.proto b/protobuf/api/workflow.proto index 91b5b93ae..adffb7250 100644 --- a/protobuf/api/workflow.proto +++ b/protobuf/api/workflow.proto @@ -30,7 +30,8 @@ service Workflow { message CreateWorkflowRequest { string key = 2; // Workflow's key types.Workflow.Trigger trigger = 3; // Trigger for the workflow. - repeated types.Workflow.Task tasks = 4; // Task to execute when the trigger is valid. + repeated types.Workflow.Node nodes = 4; // List of nodes of the workflow. + repeated types.Workflow.Edge edges = 5; // List of edges of the workflow. } // The response's data for the `Create` API. diff --git a/protobuf/types/execution.pb.go b/protobuf/types/execution.pb.go index 8145a6706..3554b79b7 100644 --- a/protobuf/types/execution.pb.go +++ b/protobuf/types/execution.pb.go @@ -67,8 +67,6 @@ func (Status) EnumDescriptor() ([]byte, []int) { type Execution struct { // Hash is a unique hash to identify execution. Hash string `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` - // wokflowHash is the unique hash of the workflow associated to this execution. - WorkflowHash string `protobuf:"bytes,11,opt,name=workflowHash,proto3" json:"workflowHash,omitempty"` // parentHash is the unique hash of parent execution. if execution is triggered by another one, dependency execution considered as the parent. ParentHash string `protobuf:"bytes,2,opt,name=parentHash,proto3" json:"parentHash,omitempty"` // eventHash is unique event hash. @@ -86,7 +84,11 @@ type Execution struct { // error message of a failed execution. Error string `protobuf:"bytes,9,opt,name=error,proto3" json:"error,omitempty"` // tags are optionally associated with execution by the user. - Tags []string `protobuf:"bytes,10,rep,name=tags,proto3" json:"tags,omitempty"` + Tags []string `protobuf:"bytes,10,rep,name=tags,proto3" json:"tags,omitempty"` + // workflowHash is the unique hash of the workflow associated to this execution. + WorkflowHash string `protobuf:"bytes,11,opt,name=workflowHash,proto3" json:"workflowHash,omitempty"` + // step of the workflow. + StepID string `protobuf:"bytes,12,opt,name=stepID,proto3" json:"stepID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -124,13 +126,6 @@ func (m *Execution) GetHash() string { return "" } -func (m *Execution) GetWorkflowHash() string { - if m != nil { - return m.WorkflowHash - } - return "" -} - func (m *Execution) GetParentHash() string { if m != nil { return m.ParentHash @@ -194,6 +189,20 @@ func (m *Execution) GetTags() []string { return nil } +func (m *Execution) GetWorkflowHash() string { + if m != nil { + return m.WorkflowHash + } + return "" +} + +func (m *Execution) GetStepID() string { + if m != nil { + return m.StepID + } + return "" +} + func init() { proto.RegisterEnum("types.Status", Status_name, Status_value) proto.RegisterType((*Execution)(nil), "types.Execution") @@ -202,29 +211,29 @@ func init() { func init() { proto.RegisterFile("protobuf/types/execution.proto", fileDescriptor_42e00237fcad5e7f) } var fileDescriptor_42e00237fcad5e7f = []byte{ - // 370 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xdd, 0x6a, 0xdb, 0x40, - 0x10, 0x85, 0x2b, 0xff, 0x48, 0xd5, 0xb8, 0x36, 0x62, 0x29, 0x54, 0x14, 0x63, 0x84, 0xa1, 0x60, - 0x0a, 0x95, 0x5a, 0xf7, 0x0d, 0x6a, 0x5a, 0x1a, 0x42, 0x20, 0xd8, 0xe4, 0x26, 0x77, 0x6b, 0x79, - 0x2c, 0x0b, 0xcb, 0xbb, 0x62, 0x7f, 0xe2, 0xf8, 0xbd, 0xf3, 0x00, 0x61, 0x47, 0x56, 0xe2, 0xdc, - 0xe4, 0x6e, 0xcf, 0xf9, 0xce, 0x48, 0xa3, 0xa3, 0x85, 0x49, 0xad, 0xa4, 0x91, 0x6b, 0xbb, 0xcd, - 0xcc, 0xa9, 0x46, 0x9d, 0xe1, 0x23, 0xe6, 0xd6, 0x94, 0x52, 0xa4, 0x04, 0x58, 0x9f, 0xec, 0xaf, - 0xe3, 0x42, 0xca, 0xa2, 0xc2, 0xec, 0x25, 0xad, 0x8d, 0xb2, 0xb9, 0x69, 0x42, 0xd3, 0xa7, 0x0e, - 0x84, 0x7f, 0xdb, 0x41, 0xc6, 0xa0, 0xb7, 0xe3, 0x7a, 0x17, 0x7b, 0x89, 0x37, 0x0b, 0x97, 0x74, - 0x66, 0x53, 0xf8, 0x74, 0x94, 0x6a, 0xbf, 0xad, 0xe4, 0xf1, 0xbf, 0x63, 0x03, 0x62, 0x6f, 0x3c, - 0x36, 0x01, 0xa8, 0xb9, 0x42, 0x61, 0x28, 0xd1, 0xa1, 0xc4, 0x85, 0xc3, 0xc6, 0x10, 0xe2, 0x43, - 0x8b, 0xbb, 0x84, 0x5f, 0x0d, 0xf6, 0x0d, 0x7c, 0x6d, 0xb8, 0xb1, 0x3a, 0xee, 0x25, 0xde, 0x6c, - 0x34, 0x1f, 0xa6, 0xb4, 0x79, 0xba, 0x22, 0x73, 0x79, 0x86, 0x6e, 0x91, 0x52, 0x68, 0xc3, 0x45, - 0x8e, 0xf4, 0x9c, 0x7e, 0xb3, 0xc8, 0xa5, 0xc7, 0x62, 0x08, 0x0c, 0xd7, 0xfb, 0x6b, 0x3c, 0xc5, - 0x3e, 0xe1, 0x56, 0xb2, 0x0c, 0xfc, 0x52, 0xd4, 0xd6, 0xe8, 0x38, 0x48, 0xbc, 0xd9, 0x60, 0xfe, - 0x25, 0x6d, 0x7a, 0x49, 0xdb, 0x5e, 0xd2, 0x15, 0xf5, 0xb2, 0x3c, 0xc7, 0xd8, 0x2f, 0x08, 0xa4, - 0x35, 0x34, 0xf1, 0xf1, 0xfd, 0x89, 0x36, 0xc7, 0x3e, 0x43, 0x1f, 0x95, 0x92, 0x2a, 0x0e, 0xe9, - 0xdd, 0x8d, 0x70, 0xa5, 0x1a, 0x5e, 0xe8, 0x18, 0x92, 0xae, 0x2b, 0xd5, 0x9d, 0xbf, 0xdf, 0x80, - 0xdf, 0x7c, 0x1d, 0x1b, 0x40, 0x70, 0x27, 0xf6, 0x42, 0x1e, 0x45, 0xf4, 0xc1, 0x89, 0x85, 0x42, - 0x6e, 0x70, 0x13, 0x79, 0x6c, 0x04, 0x70, 0x25, 0x6e, 0x95, 0x2c, 0x14, 0x6a, 0x1d, 0x75, 0xd8, - 0x10, 0xc2, 0x85, 0x3c, 0xd4, 0x15, 0x3a, 0xdc, 0x65, 0x00, 0xfe, 0x3f, 0x5e, 0x56, 0xb8, 0x89, - 0x7a, 0x7f, 0xe6, 0xf7, 0x3f, 0x8b, 0xd2, 0xec, 0xec, 0x3a, 0xcd, 0xe5, 0x21, 0x3b, 0xa0, 0x2e, - 0x7e, 0x6c, 0xa5, 0x15, 0x1b, 0xee, 0xfe, 0x6a, 0x86, 0xa2, 0x28, 0xc5, 0xc5, 0x05, 0xa0, 0x76, - 0xd7, 0x3e, 0xe9, 0xdf, 0xcf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x69, 0xef, 0x02, 0xc6, 0x47, 0x02, - 0x00, 0x00, + // 382 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0x6f, 0x8b, 0xd3, 0x40, + 0x10, 0xc6, 0xcd, 0xa5, 0x4d, 0xcc, 0xf4, 0xee, 0x08, 0x83, 0xe8, 0x22, 0xc7, 0x11, 0x0e, 0x84, + 0x22, 0x98, 0x68, 0xfd, 0x06, 0x9e, 0x8a, 0x87, 0x08, 0xd2, 0xc3, 0x37, 0xbe, 0xdb, 0xa6, 0xd3, + 0x34, 0x34, 0xdd, 0x0d, 0xfb, 0xc7, 0x7a, 0xdf, 0xc1, 0x0f, 0x2d, 0x3b, 0x69, 0xb4, 0xbe, 0xf1, + 0xdd, 0x3e, 0xcf, 0x6f, 0x66, 0x93, 0x67, 0x66, 0xe1, 0xba, 0x37, 0xda, 0xe9, 0x95, 0xdf, 0x54, + 0xee, 0xa1, 0x27, 0x5b, 0xd1, 0x4f, 0xaa, 0xbd, 0x6b, 0xb5, 0x2a, 0x19, 0xe0, 0x94, 0xed, 0xe7, + 0x57, 0x8d, 0xd6, 0x4d, 0x47, 0xd5, 0x9f, 0x6a, 0xeb, 0x8c, 0xaf, 0xdd, 0x50, 0x74, 0xf3, 0x2b, + 0x86, 0xec, 0xc3, 0xd8, 0x88, 0x08, 0x93, 0xad, 0xb4, 0x5b, 0x11, 0x15, 0xd1, 0x3c, 0x5b, 0xf2, + 0x19, 0xaf, 0x01, 0x7a, 0x69, 0x48, 0xb9, 0x4f, 0x81, 0x9c, 0x31, 0x39, 0x71, 0xf0, 0x0a, 0x32, + 0xfa, 0x31, 0xe2, 0x98, 0xf1, 0x5f, 0x03, 0x5f, 0x40, 0x62, 0x9d, 0x74, 0xde, 0x8a, 0x49, 0x11, + 0xcd, 0x2f, 0x17, 0x17, 0x25, 0xff, 0x55, 0x79, 0xcf, 0xe6, 0xf2, 0x08, 0xf1, 0x06, 0xce, 0x5b, + 0x65, 0x9d, 0x54, 0x35, 0xf1, 0x3d, 0x53, 0xbe, 0xe7, 0x1f, 0x0f, 0x05, 0xa4, 0x4e, 0xda, 0xdd, + 0x67, 0x7a, 0x10, 0x09, 0xe3, 0x51, 0x62, 0x05, 0x49, 0xab, 0x7a, 0xef, 0xac, 0x48, 0x8b, 0x68, + 0x3e, 0x5b, 0x3c, 0x2b, 0x87, 0xcc, 0xe5, 0x98, 0xb9, 0xbc, 0xe7, 0xcc, 0xcb, 0x63, 0x19, 0xbe, + 0x81, 0x54, 0x7b, 0xc7, 0x1d, 0x8f, 0xff, 0xdf, 0x31, 0xd6, 0xe1, 0x13, 0x98, 0x92, 0x31, 0xda, + 0x88, 0x8c, 0xbf, 0x3d, 0x88, 0x30, 0x30, 0x27, 0x1b, 0x2b, 0xa0, 0x88, 0xc3, 0xc0, 0xc2, 0x39, + 0x64, 0x39, 0x68, 0xb3, 0xdb, 0x74, 0xfa, 0xc0, 0x59, 0x66, 0x43, 0x96, 0x53, 0x0f, 0x9f, 0x86, + 0xb1, 0x50, 0x7f, 0xf7, 0x5e, 0x9c, 0x33, 0x3d, 0xaa, 0x97, 0x5f, 0x20, 0x19, 0x26, 0x83, 0x33, + 0x48, 0xbf, 0xa9, 0x9d, 0xd2, 0x07, 0x95, 0x3f, 0x0a, 0xe2, 0xd6, 0x90, 0x74, 0xb4, 0xce, 0x23, + 0xbc, 0x04, 0xb8, 0x53, 0x5f, 0x8d, 0x6e, 0x0c, 0x59, 0x9b, 0x9f, 0xe1, 0x05, 0x64, 0xb7, 0x7a, + 0xdf, 0x77, 0x14, 0x70, 0x8c, 0x00, 0xc9, 0x47, 0xd9, 0x76, 0xb4, 0xce, 0x27, 0xef, 0x16, 0xdf, + 0x5f, 0x37, 0xad, 0xdb, 0xfa, 0x55, 0x59, 0xeb, 0x7d, 0xb5, 0x27, 0xdb, 0xbc, 0xda, 0x68, 0xaf, + 0xd6, 0x32, 0x6c, 0xbb, 0x22, 0xd5, 0xb4, 0xea, 0xe4, 0x61, 0xf0, 0x66, 0x56, 0x09, 0xeb, 0xb7, + 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x4f, 0xd6, 0x46, 0xe1, 0x5f, 0x02, 0x00, 0x00, } diff --git a/protobuf/types/execution.proto b/protobuf/types/execution.proto index 108fc0db2..b3279e87f 100644 --- a/protobuf/types/execution.proto +++ b/protobuf/types/execution.proto @@ -30,9 +30,6 @@ message Execution { // Hash is a unique hash to identify execution. string hash = 1; - // wokflowHash is the unique hash of the workflow associated to this execution. - string workflowHash = 11; - // parentHash is the unique hash of parent execution. if execution is triggered by another one, dependency execution considered as the parent. string parentHash = 2; @@ -59,4 +56,10 @@ message Execution { // tags are optionally associated with execution by the user. repeated string tags = 10; + + // workflowHash is the unique hash of the workflow associated to this execution. + string workflowHash = 11; + + // step of the workflow. + string stepID = 12; } diff --git a/protobuf/types/workflow.pb.go b/protobuf/types/workflow.pb.go index 599ebae4f..ff6334efe 100644 --- a/protobuf/types/workflow.pb.go +++ b/protobuf/types/workflow.pb.go @@ -80,7 +80,8 @@ type Workflow struct { Hash string `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` Trigger *Workflow_Trigger `protobuf:"bytes,3,opt,name=trigger,proto3" json:"trigger,omitempty"` - Tasks []*Workflow_Task `protobuf:"bytes,4,rep,name=tasks,proto3" json:"tasks,omitempty"` + Nodes []*Workflow_Node `protobuf:"bytes,4,rep,name=nodes,proto3" json:"nodes,omitempty"` + Edges []*Workflow_Edge `protobuf:"bytes,5,rep,name=edges,proto3" json:"edges,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -132,9 +133,16 @@ func (m *Workflow) GetTrigger() *Workflow_Trigger { return nil } -func (m *Workflow) GetTasks() []*Workflow_Task { +func (m *Workflow) GetNodes() []*Workflow_Node { if m != nil { - return m.Tasks + return m.Nodes + } + return nil +} + +func (m *Workflow) GetEdges() []*Workflow_Edge { + if m != nil { + return m.Edges } return nil } @@ -145,6 +153,7 @@ type Workflow_Trigger struct { InstanceHash string `protobuf:"bytes,2,opt,name=instanceHash,proto3" json:"instanceHash,omitempty"` Key string `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` Filters []*Workflow_Trigger_Filter `protobuf:"bytes,4,rep,name=filters,proto3" json:"filters,omitempty"` + NodeKey string `protobuf:"bytes,5,opt,name=nodeKey,proto3" json:"nodeKey,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -203,6 +212,13 @@ func (m *Workflow_Trigger) GetFilters() []*Workflow_Trigger_Filter { return nil } +func (m *Workflow_Trigger) GetNodeKey() string { + if m != nil { + return m.NodeKey + } + return "" +} + // Filter is applied on the data of the event/result. type Workflow_Trigger_Filter struct { Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` @@ -259,89 +275,148 @@ func (m *Workflow_Trigger_Filter) GetValue() string { return "" } -// Definition of the task to execute when the workflow is triggered. -type Workflow_Task struct { - InstanceHash string `protobuf:"bytes,1,opt,name=instanceHash,proto3" json:"instanceHash,omitempty"` - TaskKey string `protobuf:"bytes,2,opt,name=taskKey,proto3" json:"taskKey,omitempty"` +// Definition of the node to execute when the workflow is triggered. +type Workflow_Node struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + InstanceHash string `protobuf:"bytes,2,opt,name=instanceHash,proto3" json:"instanceHash,omitempty"` + TaskKey string `protobuf:"bytes,3,opt,name=taskKey,proto3" json:"taskKey,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *Workflow_Task) Reset() { *m = Workflow_Task{} } -func (m *Workflow_Task) String() string { return proto.CompactTextString(m) } -func (*Workflow_Task) ProtoMessage() {} -func (*Workflow_Task) Descriptor() ([]byte, []int) { +func (m *Workflow_Node) Reset() { *m = Workflow_Node{} } +func (m *Workflow_Node) String() string { return proto.CompactTextString(m) } +func (*Workflow_Node) ProtoMessage() {} +func (*Workflow_Node) Descriptor() ([]byte, []int) { return fileDescriptor_980f671c228050a1, []int{0, 1} } -func (m *Workflow_Task) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Workflow_Task.Unmarshal(m, b) +func (m *Workflow_Node) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Workflow_Node.Unmarshal(m, b) } -func (m *Workflow_Task) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Workflow_Task.Marshal(b, m, deterministic) +func (m *Workflow_Node) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Workflow_Node.Marshal(b, m, deterministic) } -func (m *Workflow_Task) XXX_Merge(src proto.Message) { - xxx_messageInfo_Workflow_Task.Merge(m, src) +func (m *Workflow_Node) XXX_Merge(src proto.Message) { + xxx_messageInfo_Workflow_Node.Merge(m, src) } -func (m *Workflow_Task) XXX_Size() int { - return xxx_messageInfo_Workflow_Task.Size(m) +func (m *Workflow_Node) XXX_Size() int { + return xxx_messageInfo_Workflow_Node.Size(m) } -func (m *Workflow_Task) XXX_DiscardUnknown() { - xxx_messageInfo_Workflow_Task.DiscardUnknown(m) +func (m *Workflow_Node) XXX_DiscardUnknown() { + xxx_messageInfo_Workflow_Node.DiscardUnknown(m) } -var xxx_messageInfo_Workflow_Task proto.InternalMessageInfo +var xxx_messageInfo_Workflow_Node proto.InternalMessageInfo -func (m *Workflow_Task) GetInstanceHash() string { +func (m *Workflow_Node) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *Workflow_Node) GetInstanceHash() string { if m != nil { return m.InstanceHash } return "" } -func (m *Workflow_Task) GetTaskKey() string { +func (m *Workflow_Node) GetTaskKey() string { if m != nil { return m.TaskKey } return "" } +type Workflow_Edge struct { + Src string `protobuf:"bytes,1,opt,name=src,proto3" json:"src,omitempty"` + Dst string `protobuf:"bytes,2,opt,name=dst,proto3" json:"dst,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Workflow_Edge) Reset() { *m = Workflow_Edge{} } +func (m *Workflow_Edge) String() string { return proto.CompactTextString(m) } +func (*Workflow_Edge) ProtoMessage() {} +func (*Workflow_Edge) Descriptor() ([]byte, []int) { + return fileDescriptor_980f671c228050a1, []int{0, 2} +} + +func (m *Workflow_Edge) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Workflow_Edge.Unmarshal(m, b) +} +func (m *Workflow_Edge) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Workflow_Edge.Marshal(b, m, deterministic) +} +func (m *Workflow_Edge) XXX_Merge(src proto.Message) { + xxx_messageInfo_Workflow_Edge.Merge(m, src) +} +func (m *Workflow_Edge) XXX_Size() int { + return xxx_messageInfo_Workflow_Edge.Size(m) +} +func (m *Workflow_Edge) XXX_DiscardUnknown() { + xxx_messageInfo_Workflow_Edge.DiscardUnknown(m) +} + +var xxx_messageInfo_Workflow_Edge proto.InternalMessageInfo + +func (m *Workflow_Edge) GetSrc() string { + if m != nil { + return m.Src + } + return "" +} + +func (m *Workflow_Edge) GetDst() string { + if m != nil { + return m.Dst + } + return "" +} + func init() { proto.RegisterEnum("types.Workflow_Trigger_Type", Workflow_Trigger_Type_name, Workflow_Trigger_Type_value) proto.RegisterEnum("types.Workflow_Trigger_Filter_Predicate", Workflow_Trigger_Filter_Predicate_name, Workflow_Trigger_Filter_Predicate_value) proto.RegisterType((*Workflow)(nil), "types.Workflow") proto.RegisterType((*Workflow_Trigger)(nil), "types.Workflow.Trigger") proto.RegisterType((*Workflow_Trigger_Filter)(nil), "types.Workflow.Trigger.Filter") - proto.RegisterType((*Workflow_Task)(nil), "types.Workflow.Task") + proto.RegisterType((*Workflow_Node)(nil), "types.Workflow.Node") + proto.RegisterType((*Workflow_Edge)(nil), "types.Workflow.Edge") } func init() { proto.RegisterFile("protobuf/types/workflow.proto", fileDescriptor_980f671c228050a1) } var fileDescriptor_980f671c228050a1 = []byte{ - // 380 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xcd, 0xca, 0xd3, 0x40, - 0x14, 0x75, 0xf2, 0x6b, 0x6e, 0xa5, 0x84, 0xa1, 0x60, 0x28, 0x2a, 0x21, 0xab, 0x50, 0x30, 0xa9, - 0x71, 0xe3, 0x5a, 0x6c, 0x11, 0xdc, 0x68, 0xa8, 0x08, 0xee, 0xd2, 0x76, 0x92, 0x86, 0xa4, 0x33, - 0x21, 0x33, 0x69, 0xe9, 0xab, 0xf8, 0x4c, 0x3e, 0x8e, 0x0f, 0x20, 0x99, 0x74, 0x5a, 0x4a, 0xbf, - 0x7e, 0xbb, 0xb9, 0xe7, 0x9e, 0x39, 0xf7, 0xdc, 0x1f, 0x78, 0xdb, 0xb4, 0x4c, 0xb0, 0x75, 0x97, - 0xc7, 0xe2, 0xd4, 0x10, 0x1e, 0x1f, 0x59, 0x5b, 0xe5, 0x35, 0x3b, 0x46, 0x12, 0xc7, 0xa6, 0x44, - 0x83, 0xbf, 0x06, 0xbc, 0xfc, 0x75, 0xce, 0x60, 0x0c, 0xc6, 0x2e, 0xe3, 0x3b, 0x0f, 0xf9, 0x28, - 0x74, 0x52, 0xf9, 0xc6, 0x2e, 0xe8, 0x15, 0x39, 0x79, 0x9a, 0x84, 0xfa, 0x27, 0xfe, 0x00, 0xb6, - 0x68, 0xcb, 0xa2, 0x20, 0xad, 0xa7, 0xfb, 0x28, 0x1c, 0x25, 0xaf, 0x23, 0xa9, 0x15, 0x29, 0x9d, - 0x68, 0x35, 0xa4, 0x53, 0xc5, 0xc3, 0x33, 0x30, 0x45, 0xc6, 0x2b, 0xee, 0x19, 0xbe, 0x1e, 0x8e, - 0x92, 0xc9, 0xdd, 0x87, 0x8c, 0x57, 0xe9, 0x40, 0x99, 0xfe, 0xd3, 0xc0, 0x3e, 0x0b, 0xe0, 0x39, - 0x18, 0x3d, 0x53, 0x1a, 0x1a, 0x27, 0x6f, 0x1e, 0xd4, 0x89, 0x56, 0xa7, 0x86, 0xa4, 0x92, 0x89, - 0x03, 0x78, 0x55, 0x52, 0x2e, 0x32, 0xba, 0x21, 0x5f, 0xfb, 0x56, 0x06, 0xdf, 0x37, 0x98, 0x6a, - 0x49, 0xbf, 0xb6, 0xf4, 0x09, 0xec, 0xbc, 0xac, 0x05, 0x69, 0x95, 0xc3, 0x77, 0x8f, 0x4a, 0x2d, - 0x25, 0x2d, 0x55, 0xf4, 0xe9, 0x1f, 0x04, 0xd6, 0x80, 0x29, 0x59, 0x74, 0x95, 0x5d, 0x82, 0xd3, - 0xb4, 0x64, 0x5b, 0x6e, 0x32, 0x41, 0xa4, 0x93, 0x71, 0x12, 0x3e, 0x2f, 0x1c, 0x7d, 0x57, 0xfc, - 0xf4, 0xfa, 0x15, 0x4f, 0xc0, 0x3c, 0x64, 0x75, 0x47, 0xce, 0x96, 0x87, 0x20, 0xf0, 0xc1, 0xb9, - 0xb0, 0xf1, 0x08, 0xec, 0x9f, 0xb4, 0xa2, 0xec, 0x48, 0xdd, 0x17, 0xd8, 0x02, 0x6d, 0xf1, 0xc3, - 0x45, 0xc1, 0x0c, 0x8c, 0x7e, 0x34, 0xb7, 0x49, 0x07, 0xcc, 0xc5, 0x81, 0x50, 0xe1, 0x22, 0x0c, - 0x60, 0xa5, 0x84, 0x77, 0xb5, 0x70, 0xb5, 0xe9, 0x17, 0x30, 0xfa, 0x2d, 0xdc, 0x0d, 0x10, 0x3d, - 0x31, 0x40, 0x0f, 0xec, 0x7e, 0x57, 0xdf, 0x2e, 0x77, 0xa1, 0xc2, 0xcf, 0xc9, 0xef, 0x79, 0x51, - 0x8a, 0x5d, 0xb7, 0x8e, 0x36, 0x6c, 0x1f, 0xef, 0x09, 0x2f, 0xde, 0xe7, 0xac, 0xa3, 0xdb, 0x4c, - 0x94, 0x8c, 0xc6, 0x84, 0x16, 0x25, 0x25, 0xf1, 0xed, 0x61, 0xae, 0x2d, 0x19, 0x7f, 0xfc, 0x1f, - 0x00, 0x00, 0xff, 0xff, 0xe0, 0x2b, 0xd9, 0xd4, 0xb1, 0x02, 0x00, 0x00, + // 432 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xc1, 0x8a, 0xdb, 0x30, + 0x10, 0x86, 0xeb, 0xd8, 0x8e, 0x9b, 0x49, 0x59, 0x8c, 0x58, 0xa8, 0x08, 0x6d, 0x09, 0x39, 0x85, + 0x40, 0xed, 0xad, 0x7b, 0xe9, 0xb9, 0x90, 0xa5, 0xb0, 0x50, 0x5a, 0xb1, 0x6d, 0xa1, 0x37, 0xc7, + 0x9a, 0x38, 0x26, 0x59, 0x29, 0x48, 0xf2, 0x86, 0xbc, 0x44, 0x1f, 0xa0, 0x4f, 0xd5, 0x47, 0x2a, + 0x92, 0xa3, 0x0d, 0x29, 0x9b, 0xb2, 0x37, 0xcd, 0xcc, 0xa7, 0xf9, 0xe7, 0x1f, 0xd9, 0xf0, 0x7a, + 0xab, 0xa4, 0x91, 0x8b, 0x76, 0x99, 0x9b, 0xfd, 0x16, 0x75, 0xbe, 0x93, 0x6a, 0xbd, 0xdc, 0xc8, + 0x5d, 0xe6, 0xf2, 0x24, 0x76, 0xd9, 0xc9, 0x9f, 0x18, 0x9e, 0xff, 0x38, 0x54, 0x08, 0x81, 0x68, + 0x55, 0xea, 0x15, 0x0d, 0xc6, 0xc1, 0x74, 0xc0, 0xdc, 0x99, 0xa4, 0x10, 0xae, 0x71, 0x4f, 0x7b, + 0x2e, 0x65, 0x8f, 0xe4, 0x1d, 0x24, 0x46, 0x35, 0x75, 0x8d, 0x8a, 0x86, 0xe3, 0x60, 0x3a, 0x2c, + 0x5e, 0x66, 0xae, 0x57, 0xe6, 0xfb, 0x64, 0xb7, 0x5d, 0x99, 0x79, 0x8e, 0xcc, 0x20, 0x16, 0x92, + 0xa3, 0xa6, 0xd1, 0x38, 0x9c, 0x0e, 0x8b, 0xcb, 0x7f, 0x2f, 0x7c, 0x96, 0x1c, 0x59, 0x87, 0x58, + 0x16, 0x79, 0x8d, 0x9a, 0xc6, 0x8f, 0xb3, 0x73, 0x5e, 0x23, 0xeb, 0x90, 0xd1, 0xaf, 0x10, 0x92, + 0x83, 0x18, 0xb9, 0x82, 0xc8, 0x92, 0x6e, 0xf8, 0x8b, 0xe2, 0xd5, 0x99, 0x99, 0xb2, 0xdb, 0xfd, + 0x16, 0x99, 0x23, 0xc9, 0x04, 0x5e, 0x34, 0x42, 0x9b, 0x52, 0x54, 0xf8, 0xc9, 0xda, 0xee, 0x3c, + 0x9e, 0xe4, 0xbc, 0xfd, 0xf0, 0x68, 0xff, 0x03, 0x24, 0xcb, 0x66, 0x63, 0x50, 0x79, 0x37, 0x6f, + 0xce, 0x49, 0x5d, 0x3b, 0x8c, 0x79, 0x9c, 0x50, 0x48, 0xac, 0xc5, 0x1b, 0xdc, 0xd3, 0xd8, 0xf5, + 0xf3, 0xe1, 0xe8, 0x77, 0x00, 0xfd, 0x8e, 0xf6, 0x82, 0xc1, 0x51, 0xf0, 0x1a, 0x06, 0x5b, 0x85, + 0xbc, 0xa9, 0x4a, 0x83, 0x6e, 0xc6, 0x8b, 0x62, 0xfa, 0x7f, 0xc9, 0xec, 0x8b, 0xe7, 0xd9, 0xf1, + 0x2a, 0xb9, 0x84, 0xf8, 0xbe, 0xdc, 0xb4, 0x78, 0x30, 0xd3, 0x05, 0x93, 0x31, 0x0c, 0x1e, 0x68, + 0x32, 0x84, 0xe4, 0x9b, 0x58, 0x0b, 0xb9, 0x13, 0xe9, 0x33, 0xd2, 0x87, 0xde, 0xfc, 0x6b, 0x1a, + 0x4c, 0x66, 0x10, 0xd9, 0xa5, 0x9d, 0x16, 0x07, 0x10, 0xcf, 0xef, 0x51, 0x98, 0x34, 0x20, 0x00, + 0x7d, 0x86, 0xba, 0xdd, 0x98, 0xb4, 0x37, 0xfa, 0x0e, 0x91, 0x7d, 0xcb, 0x47, 0x5c, 0x3c, 0x65, + 0xd9, 0x14, 0x12, 0x53, 0xea, 0xf5, 0xcd, 0xc3, 0xc2, 0x7d, 0x38, 0x9a, 0x41, 0x64, 0xdf, 0xdd, + 0xf6, 0xd5, 0xaa, 0xf2, 0x7d, 0xb5, 0xaa, 0x6c, 0x86, 0x6b, 0xe3, 0xbf, 0x4f, 0xae, 0xcd, 0xc7, + 0xe2, 0xe7, 0x55, 0xdd, 0x98, 0x55, 0xbb, 0xc8, 0x2a, 0x79, 0x97, 0xdf, 0xa1, 0xae, 0xdf, 0x2e, + 0x65, 0x2b, 0x78, 0x69, 0x1a, 0x29, 0x72, 0x14, 0x75, 0x23, 0x30, 0x3f, 0xfd, 0x39, 0x16, 0x7d, + 0x17, 0xbf, 0xff, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x3d, 0x35, 0x45, 0x22, 0x35, 0x03, 0x00, 0x00, } diff --git a/protobuf/types/workflow.proto b/protobuf/types/workflow.proto index 6b185d3dd..8b08b73ac 100644 --- a/protobuf/types/workflow.proto +++ b/protobuf/types/workflow.proto @@ -30,20 +30,28 @@ message Workflow { string value = 3; // Value of the filter } - Type type = 1; // Type of trigger. + Type type = 1; // Type of trigger. string instanceHash = 2; // Hash of the instance that triggers the workflow. string key = 3; // Key that triggers the workflow (event key or task key). repeated Filter filters = 4; // List of filters to apply on the data of the event/result. + string nodeKey = 5; // First node to trigger when the workflow starts. } - // Definition of the task to execute when the workflow is triggered. - message Task { - string instanceHash = 1; // Hash of the instance to execute. - string taskKey = 2; // Task of the instance to execute. + // Definition of the node to execute when the workflow is triggered. + message Node { + string key = 1; // Key that identifies the node. + string instanceHash = 2; // Hash of the instance to execute. + string taskKey = 3; // Task of the instance to execute. + } + + message Edge { + string src = 1; // Source of the edge. + string dst = 2; // Destination of the edge. } string hash = 1; // Workflow's hash string key = 2; // Workflow's key Trigger trigger = 3; // Trigger for the workflow. - repeated Task tasks = 4; // Task to execute when the trigger is valid. + repeated Node nodes = 4; // Nodes with information related to the execution to trigger. + repeated Edge edges = 5; // Edges to create the link between the nodes. } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a44bfb010..f1f94301e 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -48,13 +48,13 @@ func (s *Scheduler) Start() error { case event := <-s.eventStream.C: go s.processTrigger(workflow.EVENT, event.InstanceHash, event.Key, event.Data, event.Hash, nil) case execution := <-s.executionStream.C: - go s.processTrigger(workflow.RESULT, execution.InstanceHash, execution.TaskKey, execution.Outputs, nil, execution) + go s.processTrigger(workflow.RESULT, execution.InstanceHash, execution.TaskKey, execution.Outputs, nil, execution.Hash) go s.processExecution(execution) } } } -func (s *Scheduler) processTrigger(trigger workflow.TriggerType, instanceHash hash.Hash, key string, data map[string]interface{}, eventHash hash.Hash, exec *execution.Execution) { +func (s *Scheduler) processTrigger(trigger workflow.TriggerType, instanceHash hash.Hash, key string, data map[string]interface{}, eventHash hash.Hash, execHash hash.Hash) { workflows, err := s.workflow.List() if err != nil { s.ErrC <- err @@ -62,66 +62,35 @@ func (s *Scheduler) processTrigger(trigger workflow.TriggerType, instanceHash ha } for _, wf := range workflows { if wf.Trigger.Match(trigger, instanceHash, key, data) { - if err := s.triggerExecution(wf, exec, eventHash, data); err != nil { + nextStep, err := wf.FindNode(wf.Trigger.NodeKey) + if err != nil { + s.ErrC <- err + continue + } + if _, err := s.execution.Execute(wf.Hash, nextStep.InstanceHash, eventHash, execHash, wf.Trigger.NodeKey, nextStep.TaskKey, data, []string{}); err != nil { s.ErrC <- err } } } } -func (s *Scheduler) processExecution(exec *execution.Execution) error { +func (s *Scheduler) processExecution(exec *execution.Execution) { if exec.WorkflowHash.IsZero() { - return nil + return } wf, err := s.workflow.Get(exec.WorkflowHash) if err != nil { - return err - } - return s.triggerExecution(wf, exec, nil, exec.Outputs) -} - -func (s *Scheduler) triggerExecution(wf *workflow.Workflow, prev *execution.Execution, eventHash hash.Hash, data map[string]interface{}) error { - height, err := s.getHeight(wf, prev) - if err != nil { - return err - } - if len(wf.Tasks) <= height { - // end of workflow - return nil - } - var parentHash hash.Hash - if prev != nil { - parentHash = prev.Hash - } - task := wf.Tasks[height] - if _, err := s.execution.Execute(wf.Hash, task.InstanceHash, eventHash, parentHash, task.TaskKey, data, []string{}); err != nil { - return err - } - return nil -} - -func (s *Scheduler) getHeight(wf *workflow.Workflow, exec *execution.Execution) (int, error) { - if exec == nil { - return 0, nil - } - // Result from other workflow - if !exec.WorkflowHash.Equal(wf.Hash) { - return 0, nil - } - // Execution triggered by an event - if !exec.EventHash.IsZero() { - return 1, nil - } - if exec.ParentHash.IsZero() { - panic("parent hash should be present if event is not") - } - if exec.ParentHash.Equal(exec.Hash) { - panic("parent hash cannot be equal to execution hash") + s.ErrC <- err + return } - parent, err := s.execution.Get(exec.ParentHash) - if err != nil { - return 0, err + for _, nextStepID := range wf.ChildrenIDs(exec.StepID) { + nextStep, err := wf.FindNode(nextStepID) + if err != nil { + s.ErrC <- err + continue + } + if _, err := s.execution.Execute(wf.Hash, nextStep.InstanceHash, nil, exec.Hash, nextStepID, nextStep.TaskKey, exec.Outputs, []string{}); err != nil { + s.ErrC <- err + } } - parentHeight, err := s.getHeight(wf, parent) - return parentHeight + 1, err } diff --git a/sdk/execution/execution.go b/sdk/execution/execution.go index 4f55192f1..44396d91f 100644 --- a/sdk/execution/execution.go +++ b/sdk/execution/execution.go @@ -121,7 +121,7 @@ func (e *Execution) validateExecutionOutput(instanceHash hash.Hash, taskKey stri } // Execute executes a task tasKey with inputData and tags for service serviceID. -func (e *Execution) Execute(workflowHash, instanceHash, eventHash, parentHash hash.Hash, taskKey string, inputData map[string]interface{}, tags []string) (executionHash hash.Hash, err error) { +func (e *Execution) Execute(workflowHash, instanceHash, eventHash, parentHash hash.Hash, stepID string, taskKey string, inputData map[string]interface{}, tags []string) (executionHash hash.Hash, err error) { if parentHash != nil && eventHash != nil { return nil, fmt.Errorf("cannot have both parent and event hash") } @@ -150,7 +150,7 @@ func (e *Execution) Execute(workflowHash, instanceHash, eventHash, parentHash ha return nil, err } - exec := execution.New(workflowHash, instance.Hash, parentHash, eventHash, taskKey, inputData, tags) + exec := execution.New(workflowHash, instance.Hash, parentHash, eventHash, stepID, taskKey, inputData, tags) if err := exec.Execute(); err != nil { return nil, err } diff --git a/sdk/execution/execution_test.go b/sdk/execution/execution_test.go index b1756681c..48f0a739a 100644 --- a/sdk/execution/execution_test.go +++ b/sdk/execution/execution_test.go @@ -91,7 +91,7 @@ var testInstance = &instance.Instance{ func TestGet(t *testing.T) { sdk, at := newTesting(t) defer at.close() - exec := execution.New(nil, nil, nil, nil, "", nil, nil) + exec := execution.New(nil, nil, nil, nil, "", "", nil, nil) require.NoError(t, sdk.execDB.Save(exec)) got, err := sdk.Get(exec.Hash) require.NoError(t, err) @@ -102,7 +102,7 @@ func TestGetStream(t *testing.T) { sdk, at := newTesting(t) defer at.close() - exec := execution.New(nil, nil, nil, nil, "", nil, nil) + exec := execution.New(nil, nil, nil, nil, "", "", nil, nil) exec.Status = execution.InProgress require.NoError(t, sdk.execDB.Save(exec)) @@ -123,11 +123,11 @@ func TestExecute(t *testing.T) { require.NoError(t, at.serviceDB.Save(testService)) require.NoError(t, at.instanceDB.Save(testInstance)) - _, err := sdk.Execute(nil, testInstance.Hash, hash.Int(1), nil, testService.Tasks[0].Key, map[string]interface{}{}, nil) + _, err := sdk.Execute(nil, testInstance.Hash, hash.Int(1), nil, "", testService.Tasks[0].Key, map[string]interface{}{}, nil) require.NoError(t, err) // not existing instance - _, err = sdk.Execute(nil, hash.Int(3), hash.Int(1), nil, testService.Tasks[0].Key, map[string]interface{}{}, nil) + _, err = sdk.Execute(nil, hash.Int(3), hash.Int(1), nil, "", testService.Tasks[0].Key, map[string]interface{}{}, nil) require.Error(t, err) } @@ -137,7 +137,7 @@ func TestExecuteInvalidTaskKey(t *testing.T) { require.NoError(t, at.serviceDB.Save(testService)) - _, err := sdk.Execute(nil, testService.Hash, hash.Int(1), nil, "-", map[string]interface{}{}, nil) + _, err := sdk.Execute(nil, testService.Hash, hash.Int(1), nil, "", "-", map[string]interface{}{}, nil) require.Error(t, err) } diff --git a/sdk/workflow/workflow.go b/sdk/workflow/workflow.go index 43bae5ed6..ce7e910c0 100644 --- a/sdk/workflow/workflow.go +++ b/sdk/workflow/workflow.go @@ -31,8 +31,8 @@ func (w *Workflow) Create(wf *workflow.Workflow) (*workflow.Workflow, error) { if _, err := w.instance.Get(wf.Trigger.InstanceHash); err != nil { return nil, err } - for _, task := range wf.Tasks { - if _, err := w.instance.Get(task.InstanceHash); err != nil { + for _, node := range wf.Nodes { + if _, err := w.instance.Get(node.InstanceHash); err != nil { return nil, err } } diff --git a/server/grpc/api/execution.go b/server/grpc/api/execution.go index 09e70304a..178ff7408 100644 --- a/server/grpc/api/execution.go +++ b/server/grpc/api/execution.go @@ -43,7 +43,7 @@ func (s *ExecutionServer) Create(ctx context.Context, req *api.CreateExecutionRe if err != nil { return nil, err } - executionHash, err := s.sdk.Execution.Execute(nil, instanceHash, eventHash, nil, req.TaskKey, inputs, req.Tags) + executionHash, err := s.sdk.Execution.Execute(nil, instanceHash, eventHash, nil, "", req.TaskKey, inputs, req.Tags) if err != nil { return nil, err } @@ -162,5 +162,6 @@ func toProtoExecution(exec *execution.Execution) (*types.Execution, error) { Outputs: outputs, Tags: exec.Tags, Error: exec.Error, + StepID: exec.StepID, }, nil } diff --git a/server/grpc/api/execution_test.go b/server/grpc/api/execution_test.go index 73f0996cf..30a47b2f4 100644 --- a/server/grpc/api/execution_test.go +++ b/server/grpc/api/execution_test.go @@ -20,7 +20,7 @@ func TestGet(t *testing.T) { defer db.Close() defer os.RemoveAll(execdbname) - exec := execution.New(nil, nil, nil, nil, "", nil, nil) + exec := execution.New(nil, nil, nil, nil, "", "", nil, nil) require.NoError(t, db.Save(exec)) want, err := toProtoExecution(exec) @@ -40,7 +40,7 @@ func TestUpdate(t *testing.T) { defer db.Close() defer os.RemoveAll(execdbname) - exec := execution.New(nil, nil, nil, nil, "", nil, nil) + exec := execution.New(nil, nil, nil, nil, "", "", nil, nil) require.NoError(t, db.Save(exec)) sdk := sdk.New(nil, nil, nil, db, nil, "", "") diff --git a/server/grpc/api/workflow.go b/server/grpc/api/workflow.go index 26b4f786e..534677db5 100644 --- a/server/grpc/api/workflow.go +++ b/server/grpc/api/workflow.go @@ -25,16 +25,17 @@ func (s *WorkflowServer) Create(ctx context.Context, req *api.CreateWorkflowRequ wf, err := fromProtoWorkflow(&types.Workflow{ Key: req.Key, Trigger: req.Trigger, - Tasks: req.Tasks, + Nodes: req.Nodes, + Edges: req.Edges, }) if err != nil { return nil, err } - srv, err := s.sdk.Workflow.Create(wf) + wf, err = s.sdk.Workflow.Create(wf) if err != nil { return nil, err } - return &api.CreateWorkflowResponse{Hash: srv.Hash.String()}, nil + return &api.CreateWorkflowResponse{Hash: wf.Hash.String()}, nil } // Delete deletes service by hash or sid. @@ -89,21 +90,33 @@ func fromProtoFilters(filters []*types.Workflow_Trigger_Filter) []*workflow.Trig return fs } -func fromProtoWorkflowTasks(tasks []*types.Workflow_Task) ([]*workflow.Task, error) { - res := make([]*workflow.Task, len(tasks)) - for i, task := range tasks { - instanceHash, err := hash.Decode(task.InstanceHash) +func fromProtoWorkflowNodes(nodes []*types.Workflow_Node) ([]workflow.Node, error) { + res := make([]workflow.Node, len(nodes)) + for i, node := range nodes { + instanceHash, err := hash.Decode(node.InstanceHash) if err != nil { return nil, err } - res[i] = &workflow.Task{ + res[i] = workflow.Node{ + Key: node.Key, InstanceHash: instanceHash, - TaskKey: task.TaskKey, + TaskKey: node.TaskKey, } } return res, nil } +func fromProtoWorkflowEdges(edges []*types.Workflow_Edge) []workflow.Edge { + res := make([]workflow.Edge, len(edges)) + for i, edge := range edges { + res[i] = workflow.Edge{ + Src: edge.Src, + Dst: edge.Dst, + } + } + return res +} + func fromProtoWorkflow(wf *types.Workflow) (*workflow.Workflow, error) { var triggerType workflow.TriggerType switch wf.Trigger.Type { @@ -116,19 +129,21 @@ func fromProtoWorkflow(wf *types.Workflow) (*workflow.Workflow, error) { if err != nil { return nil, err } - tasks, err := fromProtoWorkflowTasks(wf.Tasks) + nodes, err := fromProtoWorkflowNodes(wf.Nodes) if err != nil { return nil, err } return &workflow.Workflow{ Key: wf.Key, - Trigger: &workflow.Trigger{ + Trigger: workflow.Trigger{ Type: triggerType, InstanceHash: instanceHash, Key: wf.Trigger.Key, + NodeKey: wf.Trigger.NodeKey, Filters: fromProtoFilters(wf.Trigger.Filters), }, - Tasks: tasks, + Nodes: nodes, + Edges: fromProtoWorkflowEdges(wf.Edges), }, nil } @@ -149,12 +164,24 @@ func toProtoFilters(filters []*workflow.TriggerFilter) []*types.Workflow_Trigger return fs } -func toProtoWorkflowTasks(tasks []*workflow.Task) []*types.Workflow_Task { - res := make([]*types.Workflow_Task, len(tasks)) - for i, task := range tasks { - res[i] = &types.Workflow_Task{ - InstanceHash: task.InstanceHash.String(), - TaskKey: task.TaskKey, +func toProtoWorkflowNodes(nodes []workflow.Node) []*types.Workflow_Node { + res := make([]*types.Workflow_Node, len(nodes)) + for i, node := range nodes { + res[i] = &types.Workflow_Node{ + Key: node.Key, + InstanceHash: node.InstanceHash.String(), + TaskKey: node.TaskKey, + } + } + return res +} + +func toProtoWorkflowEdges(edges []workflow.Edge) []*types.Workflow_Edge { + res := make([]*types.Workflow_Edge, len(edges)) + for i, edge := range edges { + res[i] = &types.Workflow_Edge{ + Src: edge.Src, + Dst: edge.Dst, } } return res @@ -176,8 +203,10 @@ func toProtoWorkflow(wf *workflow.Workflow) *types.Workflow { InstanceHash: wf.Trigger.InstanceHash.String(), Key: wf.Trigger.Key, Filters: toProtoFilters(wf.Trigger.Filters), + NodeKey: wf.Trigger.NodeKey, }, - Tasks: toProtoWorkflowTasks(wf.Tasks), + Nodes: toProtoWorkflowNodes(wf.Nodes), + Edges: toProtoWorkflowEdges(wf.Edges), } } diff --git a/workflow/type.go b/workflow/type.go index 816358a91..e52429988 100644 --- a/workflow/type.go +++ b/workflow/type.go @@ -22,23 +22,32 @@ const ( // Workflow describes a workflow of a service type Workflow struct { Hash hash.Hash `hash:"-" validate:"required"` - Trigger *Trigger `hash:"name:1" validate:"required"` - Tasks []*Task `hash:"name:2" validate:"required"` - Key string `hash:"name:3" validate:"required"` + Key string `hash:"name:1" validate:"required"` + Trigger Trigger `hash:"name:2" validate:"required"` + Nodes []Node `hash:"name:3" validate:"dive,required"` + Edges []Edge `hash:"name:4" validate:"dive,required"` } -// Task describes the instructions for the workflow to execute a task -type Task struct { - InstanceHash hash.Hash `hash:"name:1" validate:"required"` - TaskKey string `hash:"name:2" validate:"printascii"` +// Node describes the instructions for the workflow to execute a task +type Node struct { + Key string `hash:"name:1" validate:"required"` + InstanceHash hash.Hash `hash:"name:2" validate:"required"` + TaskKey string `hash:"name:3" validate:"required,printascii"` +} + +// Edge describes the instructions for the workflow to execute a task +type Edge struct { + Src string `hash:"name:1" validate:"required"` + Dst string `hash:"name:2" validate:"required"` } // Trigger is an event that triggers a workflow type Trigger struct { InstanceHash hash.Hash `hash:"name:1" validate:"required"` - Key string `hash:"name:2" validate:"printascii"` + Key string `hash:"name:2" validate:"required,printascii"` Type TriggerType `hash:"name:3" validate:"required"` Filters []*TriggerFilter `hash:"name:4" validate:"dive,required"` + NodeKey string `hash:"name:5" validate:"required"` } // TriggerFilter is the filter definition that can be applied to a workflow trigger diff --git a/workflow/workflow.go b/workflow/workflow.go index fe32b421d..4a6c8497a 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -1,6 +1,10 @@ package workflow -import "github.com/mesg-foundation/engine/hash" +import ( + "fmt" + + "github.com/mesg-foundation/engine/hash" +) // Match returns true if a workflow trigger is matching the given parameters func (t *Trigger) Match(trigger TriggerType, instanceHash hash.Hash, key string, data map[string]interface{}) bool { @@ -33,3 +37,24 @@ func (f *TriggerFilter) Match(inputs map[string]interface{}) bool { return false } } + +// ChildrenIDs returns the list of node IDs with a dependency to the current node +func (w Workflow) ChildrenIDs(nodeKey string) []string { + nodeKeys := make([]string, 0) + for _, edge := range w.Edges { + if edge.Src == nodeKey { + nodeKeys = append(nodeKeys, edge.Dst) + } + } + return nodeKeys +} + +// FindNode returns the node matching the key in parameter or an error if not found +func (w Workflow) FindNode(key string) (Node, error) { + for _, node := range w.Nodes { + if node.Key == key { + return node, nil + } + } + return Node{}, fmt.Errorf("%q not found", key) +}