Skip to content

Commit

Permalink
Fix GetWorkflow to provide latest run id if none was provided to it (#…
Browse files Browse the repository at this point in the history
…294)

* Ensure that GetWorkflow will attempt to retrieve current run ID if one was not provided
* Ensure run id is lazily loaded
  • Loading branch information
Sushisource authored Nov 24, 2020
1 parent df75d9d commit 569d0d4
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 9 deletions.
69 changes: 69 additions & 0 deletions internal/common/util/once_cell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package util

import "sync"

// A OnceCell attempts to match the semantics of Rust's `OnceCell`, but only stores strings, since that's what's needed
// at the moment. Could be changed to use interface{} to be generic.
type OnceCell struct {
// Ensures we only call the fetcher one time
once sync.Once
// Stores the result of calling fetcher
value string
fetcher func() string
}

// Get fetches the value in the cell, calling the fetcher function if it has not yet been called
func (oc *OnceCell) Get() string {
oc.once.Do(func() {
res := oc.fetcher()
oc.value = res
})
return oc.value
}

// PopulatedOnceCell creates an already-initialized cell
func PopulatedOnceCell(value string) OnceCell {
return OnceCell{
once: sync.Once{},
value: value,
fetcher: func() string {
return value
},
}
}

type fetcher func() string

// LazyOnceCell creates a cell with no initial value, the provided function will be called once and only once the first
// time OnceCell.Get is called
func LazyOnceCell(fetcher fetcher) OnceCell {
return OnceCell{
once: sync.Once{},
value: "",
fetcher: fetcher,
}
}
41 changes: 41 additions & 0 deletions internal/common/util/once_cell_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_PopulatedOnceCell(t *testing.T) {
oc := PopulatedOnceCell("hi")
assert.Equal(t, "hi", oc.Get())
}

func Test_LazyOnceCell(t *testing.T) {
oc := LazyOnceCell(func() string { return "hi" })
assert.Equal(t, "hi", oc.Get())
}
40 changes: 31 additions & 9 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"go.temporal.io/sdk/internal/common/backoff"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/common/serializer"
"go.temporal.io/sdk/internal/common/util"
"go.temporal.io/sdk/log"
)

Expand Down Expand Up @@ -113,7 +114,7 @@ type (
workflowFn interface{}
workflowID string
firstRunID string
currentRunID string
currentRunID *util.OnceCell
iterFn func(ctx context.Context, runID string) HistoryEventIterator
dataConverter converter.DataConverter
registry *registry
Expand Down Expand Up @@ -278,11 +279,12 @@ func (wc *WorkflowClient) ExecuteWorkflow(ctx context.Context, options StartWork
return wc.getWorkflowHistory(fnCtx, workflowID, fnRunID, true, enumspb.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, rpcScope)
}

curRunIDCell := util.PopulatedOnceCell(runID)
return &workflowRunImpl{
workflowFn: workflow,
workflowID: workflowID,
firstRunID: runID,
currentRunID: runID,
currentRunID: &curRunIDCell,
iterFn: iterFn,
dataConverter: wc.dataConverter,
registry: wc.registry,
Expand All @@ -293,16 +295,34 @@ func (wc *WorkflowClient) ExecuteWorkflow(ctx context.Context, options StartWork
// reaches the end state, such as workflow finished successfully or timeout.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
func (wc *WorkflowClient) GetWorkflow(_ context.Context, workflowID string, runID string) WorkflowRun {
func (wc *WorkflowClient) GetWorkflow(ctx context.Context, workflowID string, runID string) WorkflowRun {

iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator {
return wc.GetWorkflowHistory(fnCtx, workflowID, fnRunID, true, enumspb.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT)
}

// The ID may not actually have been set - if not, we have to (lazily) ask the server for info about the workflow
// execution and extract run id from there. This is definitely less efficient than it could be if there was a more
// specific rpc method for this, or if there were more granular history filters - in which case it could be
// extracted from the `iterFn` inside of `workflowRunImpl`
var runIDCell util.OnceCell
if runID == "" {
fetcher := func() string {
execData, err := wc.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
wc.logger.Error("error while fetching workflow execution info", err)
}
return execData.GetWorkflowExecutionInfo().GetExecution().RunId
}
runIDCell = util.LazyOnceCell(fetcher)
} else {
runIDCell = util.PopulatedOnceCell(runID)
}

return &workflowRunImpl{
workflowID: workflowID,
firstRunID: runID,
currentRunID: runID,
currentRunID: &runIDCell,
iterFn: iterFn,
dataConverter: wc.dataConverter,
registry: wc.registry,
Expand Down Expand Up @@ -420,11 +440,12 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI
return wc.getWorkflowHistory(fnCtx, workflowID, fnRunID, true, enumspb.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, rpcScope)
}

curRunIDCell := util.PopulatedOnceCell(response.GetRunId())
return &workflowRunImpl{
workflowFn: workflowFunc,
workflowID: workflowID,
firstRunID: response.GetRunId(),
currentRunID: response.GetRunId(),
currentRunID: &curRunIDCell,
iterFn: iterFn,
dataConverter: wc.dataConverter,
registry: wc.registry,
Expand Down Expand Up @@ -1112,7 +1133,7 @@ func (iter *historyEventIteratorImpl) Next() (*historypb.HistoryEvent, error) {
}

func (workflowRun *workflowRunImpl) GetRunID() string {
return workflowRun.firstRunID
return workflowRun.currentRunID.Get()
}

func (workflowRun *workflowRunImpl) GetID() string {
Expand All @@ -1121,7 +1142,7 @@ func (workflowRun *workflowRunImpl) GetID() string {

func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{}) error {

iter := workflowRun.iterFn(ctx, workflowRun.currentRunID)
iter := workflowRun.iterFn(ctx, workflowRun.currentRunID.Get())
if !iter.HasNext() {
panic("could not get last history event for workflow")
}
Expand Down Expand Up @@ -1154,7 +1175,8 @@ func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{
err = NewTimeoutError("Workflow timeout", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil)
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
attributes := closeEvent.GetWorkflowExecutionContinuedAsNewEventAttributes()
workflowRun.currentRunID = attributes.GetNewExecutionRunId()
curRunID := util.PopulatedOnceCell(attributes.GetNewExecutionRunId())
workflowRun.currentRunID = &curRunID
return workflowRun.Get(ctx, valuePtr)
default:
return fmt.Errorf("unexpected event type %s when handling workflow execution result", closeEvent.GetEventType())
Expand All @@ -1163,7 +1185,7 @@ func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{
fnName, _ := getWorkflowFunctionName(workflowRun.registry, workflowRun.workflowFn)
err = NewWorkflowExecutionError(
workflowRun.workflowID,
workflowRun.currentRunID,
workflowRun.currentRunID.Get(),
fnName,
err)

Expand Down
18 changes: 18 additions & 0 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"testing"
"time"

workflowpb "go.temporal.io/api/workflow/v1"
ilog "go.temporal.io/sdk/internal/log"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -335,6 +338,7 @@ func (s *workflowRunSuite) SetupTest() {
options := ClientOptions{
MetricsScope: metricsScope,
Identity: identity,
Logger: ilog.NewNopLogger(),
}
s.workflowClient = NewServiceClient(s.workflowServiceClient, nil, options)
s.dataConverter = converter.GetDefaultDataConverter()
Expand Down Expand Up @@ -934,6 +938,20 @@ func (s *workflowRunSuite) TestGetWorkflow() {
s.Equal(workflowResult, decodedResult)
}

// Verify that when `GetWorkflow` is called with no run ID, the current run ID is populated
func (s *workflowRunSuite) TestGetWorkflowNoRunId() {
execution := &commonpb.WorkflowExecution{RunId: runID}
describeResp := &workflowservice.DescribeWorkflowExecutionResponse{
WorkflowExecutionInfo: &workflowpb.WorkflowExecutionInfo{Execution: execution}}
s.workflowServiceClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(describeResp, nil).Times(1)
workflowRunNoRunID := s.workflowClient.GetWorkflow(
context.Background(),
workflowID,
"",
)
s.Equal(runID, workflowRunNoRunID.GetRunID())
}

func getGetWorkflowExecutionHistoryRequest(filterType enumspb.HistoryEventFilterType) *workflowservice.GetWorkflowExecutionHistoryRequest {
request := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: DefaultNamespace,
Expand Down

0 comments on commit 569d0d4

Please sign in to comment.