Skip to content

Commit

Permalink
move the step output to a value type, fixes nil pointer exception bug…
Browse files Browse the repository at this point in the history
… in step persistence code (#13416)
  • Loading branch information
ettec authored Jun 4, 2024
1 parent fc007a9 commit 68e00b3
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/kind-cobras-hope.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal fix for workflow step persistence
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event v
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
workflows.KeywordTrigger: {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: event,
},
Status: store.StatusCompleted,
Expand Down Expand Up @@ -562,7 +562,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {

l.Debugw("executing on a step event")
stepState := &store.WorkflowExecutionStep{
Outputs: &store.StepOutput{},
Outputs: store.StepOutput{},
ExecutionID: msg.state.ExecutionID,
Ref: msg.stepRef,
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) {
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
workflows.KeywordTrigger: {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: resp,
},
Status: store.StatusCompleted,
Expand Down Expand Up @@ -599,7 +599,7 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) {
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
workflows.KeywordTrigger: {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: resp,
},
Status: store.StatusCompleted,
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func copyState(es store.WorkflowExecution) store.WorkflowExecution {
Ref: step.Ref,
Status: step.Status,

Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Err: step.Outputs.Err,
Value: copiedov,
},
Expand Down
24 changes: 12 additions & 12 deletions core/services/workflows/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: values.NewString("<a report>"),
},
},
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: values.NewString("<a report>"),
},
},
Expand All @@ -82,7 +82,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Err: errors.New("catastrophic error"),
},
},
Expand All @@ -96,7 +96,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: val,
},
},
Expand All @@ -110,7 +110,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: val,
},
},
Expand All @@ -124,7 +124,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: val,
},
},
Expand All @@ -138,7 +138,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: val,
},
},
Expand All @@ -152,7 +152,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: val,
},
},
Expand All @@ -166,7 +166,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: val,
},
},
Expand All @@ -180,7 +180,7 @@ func TestInterpolateKey(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: val,
},
},
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestInterpolateInputsFromState(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: values.NewString("<a report>"),
},
},
Expand All @@ -242,7 +242,7 @@ func TestInterpolateInputsFromState(t *testing.T) {
state: store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
"evm_median": {
Outputs: &store.StepOutput{
Outputs: store.StepOutput{
Value: values.NewString("<a report>"),
},
},
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/store/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type WorkflowExecutionStep struct {
Status string

Inputs *values.Map
Outputs *StepOutput
Outputs StepOutput

UpdatedAt *time.Time
}
Expand Down
17 changes: 4 additions & 13 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,15 @@ func stepToState(step workflowStepRow) (*WorkflowExecutionStep, error) {
outputs = values.FromProto(vProto)
}

var so *StepOutput
if outputErr != nil || outputs != nil {
so = &StepOutput{
Err: outputErr,
Value: outputs,
}
}

return &WorkflowExecutionStep{
ExecutionID: step.WorkflowExecutionID,
Ref: step.Ref,
Status: step.Status,
Inputs: inputs,
Outputs: so,
Outputs: StepOutput{
Err: outputErr,
Value: outputs,
},
}, nil
}

Expand All @@ -182,10 +177,6 @@ func stateToStep(state *WorkflowExecutionStep) (workflowStepRow, error) {
Inputs: inpb,
}

if state.Outputs == nil {
return wsr, nil
}

if state.Outputs.Value != nil {
p := values.Proto(state.Outputs.Value)
ob, err := proto.Marshal(p)
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/store/store_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ func Test_StoreDB_UpdateStep(t *testing.T) {
require.NoError(t, err)

stepOne.Inputs = nm
stepOne.Outputs = &StepOutput{Err: errors.New("some error")}
stepOne.Outputs = StepOutput{Err: errors.New("some error")}

es, err = store.UpsertStep(tests.Context(t), stepOne)
require.NoError(t, err)

gotStep := es.Steps[stepOne.Ref]
assert.Equal(t, stepOne, gotStep)

stepTwo.Outputs = &StepOutput{Value: nm}
stepTwo.Outputs = StepOutput{Value: nm}
es, err = store.UpsertStep(tests.Context(t), stepTwo)
require.NoError(t, err)

Expand Down

0 comments on commit 68e00b3

Please sign in to comment.