diff --git a/.changeset/kind-cobras-hope.md b/.changeset/kind-cobras-hope.md new file mode 100644 index 00000000000..deb4f5aeea1 --- /dev/null +++ b/.changeset/kind-cobras-hope.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#internal fix for workflow step persistence diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index cfa15905f29..b5adf3e3df2 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -398,7 +398,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event v ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ workflows.KeywordTrigger: { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: event, }, Status: store.StatusCompleted, @@ -547,7 +547,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { l.Debugw("executing on a step event") stepState := &store.WorkflowExecutionStep{ - Outputs: &store.StepOutput{}, + Outputs: store.StepOutput{}, ExecutionID: msg.state.ExecutionID, Ref: msg.stepRef, } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index aebb5c2066f..5ad2fee630c 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -544,7 +544,7 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) { ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ workflows.KeywordTrigger: { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: resp, }, Status: store.StatusCompleted, @@ -599,7 +599,7 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) { ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ workflows.KeywordTrigger: { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: resp, }, Status: store.StatusCompleted, diff --git a/core/services/workflows/state.go b/core/services/workflows/state.go index 218022eae36..6fc61af3954 100644 --- a/core/services/workflows/state.go +++ b/core/services/workflows/state.go @@ -29,7 +29,7 @@ func copyState(es store.WorkflowExecution) store.WorkflowExecution { Ref: step.Ref, Status: step.Status, - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Err: step.Outputs.Err, Value: copiedov, }, diff --git a/core/services/workflows/state_test.go b/core/services/workflows/state_test.go index ccd6cd5004d..a9829a97c74 100644 --- a/core/services/workflows/state_test.go +++ b/core/services/workflows/state_test.go @@ -38,7 +38,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: values.NewString(""), }, }, @@ -68,7 +68,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: values.NewString(""), }, }, @@ -82,7 +82,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Err: errors.New("catastrophic error"), }, }, @@ -96,7 +96,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: val, }, }, @@ -110,7 +110,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: val, }, }, @@ -124,7 +124,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: val, }, }, @@ -138,7 +138,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: val, }, }, @@ -152,7 +152,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: val, }, }, @@ -166,7 +166,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: val, }, }, @@ -180,7 +180,7 @@ func TestInterpolateKey(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: val, }, }, @@ -222,7 +222,7 @@ func TestInterpolateInputsFromState(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: values.NewString(""), }, }, @@ -242,7 +242,7 @@ func TestInterpolateInputsFromState(t *testing.T) { state: store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ "evm_median": { - Outputs: &store.StepOutput{ + Outputs: store.StepOutput{ Value: values.NewString(""), }, }, diff --git a/core/services/workflows/store/models.go b/core/services/workflows/store/models.go index 29a1df154de..27604543ede 100644 --- a/core/services/workflows/store/models.go +++ b/core/services/workflows/store/models.go @@ -24,7 +24,7 @@ type WorkflowExecutionStep struct { Status string Inputs *values.Map - Outputs *StepOutput + Outputs StepOutput UpdatedAt *time.Time } diff --git a/core/services/workflows/store/store_db.go b/core/services/workflows/store/store_db.go index 73acece5b18..e9204efd7b1 100644 --- a/core/services/workflows/store/store_db.go +++ b/core/services/workflows/store/store_db.go @@ -147,20 +147,15 @@ func stepToState(step workflowStepRow) (*WorkflowExecutionStep, error) { outputs = values.FromProto(vProto) } - var so *StepOutput - if outputErr != nil || outputs != nil { - so = &StepOutput{ - Err: outputErr, - Value: outputs, - } - } - return &WorkflowExecutionStep{ ExecutionID: step.WorkflowExecutionID, Ref: step.Ref, Status: step.Status, Inputs: inputs, - Outputs: so, + Outputs: StepOutput{ + Err: outputErr, + Value: outputs, + }, }, nil } @@ -182,10 +177,6 @@ func stateToStep(state *WorkflowExecutionStep) (workflowStepRow, error) { Inputs: inpb, } - if state.Outputs == nil { - return wsr, nil - } - if state.Outputs.Value != nil { p := values.Proto(state.Outputs.Value) ob, err := proto.Marshal(p) diff --git a/core/services/workflows/store/store_db_test.go b/core/services/workflows/store/store_db_test.go index e41f4857363..e30eda1bfc6 100644 --- a/core/services/workflows/store/store_db_test.go +++ b/core/services/workflows/store/store_db_test.go @@ -153,7 +153,7 @@ func Test_StoreDB_UpdateStep(t *testing.T) { require.NoError(t, err) stepOne.Inputs = nm - stepOne.Outputs = &StepOutput{Err: errors.New("some error")} + stepOne.Outputs = StepOutput{Err: errors.New("some error")} es, err = store.UpsertStep(tests.Context(t), stepOne) require.NoError(t, err) @@ -161,7 +161,7 @@ func Test_StoreDB_UpdateStep(t *testing.T) { gotStep := es.Steps[stepOne.Ref] assert.Equal(t, stepOne, gotStep) - stepTwo.Outputs = &StepOutput{Value: nm} + stepTwo.Outputs = StepOutput{Value: nm} es, err = store.UpsertStep(tests.Context(t), stepTwo) require.NoError(t, err)