Skip to content

Commit

Permalink
Fix lost signal from Selector when Default path blocks (temporalio#1682)
Browse files Browse the repository at this point in the history
* initial changes, added replay test for legacy history, need to finish writing tests

* Clean up tests, fix error

* unit test for fixed behavior

* PR feedback

* improve tests, add tests for AddFuture, AddSend

* add integration tests, add debug API to enable SDK flag for tests

* set flag in test itself not workflow, unset flag after test

* unify set/unset function into one
  • Loading branch information
yuandrew authored and reynieroz committed Dec 5, 2024
1 parent a9b8a0f commit 2fd6a07
Show file tree
Hide file tree
Showing 13 changed files with 765 additions and 3 deletions.
249 changes: 249 additions & 0 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
)

Expand Down Expand Up @@ -551,6 +552,254 @@ func TestBlockingSelect(t *testing.T) {
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefault(t *testing.T) {
var history []string
env := &workflowEnvironmentImpl{
sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}),
commandsHelper: newCommandsHelper(),
dataConverter: converter.GetDefaultDataConverter(),
workflowInfo: &WorkflowInfo{
Namespace: "namespace:" + t.Name(),
TaskQueueName: "taskqueue:" + t.Name(),
},
}
// Verify that the flag is not set
require.False(t, env.GetFlag(SDKFlagBlockedSelectorSignalReceive))
interceptor, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err, "newWorkflowContext failed")
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
selector.Select(ctx)

// Default behavior this signal is lost
require.True(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")
}, func() bool { return false })
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.False(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
}
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefaultWithFlag(t *testing.T) {
var history []string
env := &workflowEnvironmentImpl{
sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}),
commandsHelper: newCommandsHelper(),
dataConverter: converter.GetDefaultDataConverter(),
workflowInfo: &WorkflowInfo{
Namespace: "namespace:" + t.Name(),
TaskQueueName: "taskqueue:" + t.Name(),
},
}
require.True(t, env.TryUse(SDKFlagBlockedSelectorSignalReceive))
interceptor, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err, "newWorkflowContext failed")
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
selector.Select(ctx)

// Signal should not be lost
require.False(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")
}, func() bool { return false })
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
"c1-one",
"done",
}

require.EqualValues(t, expected, history)
}

func TestBlockingSelectFuture(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
c1 := NewChannel(ctx)
f1, s1 := NewFuture(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")
})
Go(ctx, func(ctx Context) {
history = append(history, "add-two")
s1.SetValue("one-future")
})

selector := NewSelector(ctx)
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
var v string
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddFuture(f1, func(f Future) {
var v string
err := f.Get(ctx, &v)
require.NoError(t, err)
history = append(history, fmt.Sprintf("f1-%v", v))
})
history = append(history, "select1")
selector.Select(ctx)
fmt.Println("select1 done", history)

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")

})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone(), strings.Join(history, "\n"))
expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"c1-one",
"select2",
"f1-one-future",
"done",
}
require.EqualValues(t, expected, history)
}

func TestBlockingSelectSend(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")
})
Go(ctx, func(ctx Context) {
require.True(t, c2.Len() == 1)
history = append(history, "receiver")
var v string
more := c2.Receive(ctx, &v)
require.True(t, more)
history = append(history, fmt.Sprintf("c2-%v", v))
require.True(t, c2.Len() == 0)
})

selector := NewSelector(ctx)
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
var v string
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddSend(c2, "two", func() { history = append(history, "send2") })
history = append(history, "select1")
selector.Select(ctx)

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")

})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone(), strings.Join(history, "\n"))
expected := []string{
"select1",
"add-one",
"add-one-done",
"receiver",
"c1-one",
"select2",
"send2",
"done",
"c2-two",
}
require.EqualValues(t, expected, history)
}

func TestBlockingSelectAsyncSend(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,10 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
return wc.sdkFlags.tryUse(flag, !wc.isReplay)
}

func (wc *workflowEnvironmentImpl) GetFlag(flag sdkFlag) bool {
return wc.sdkFlags.getFlag(flag)
}

func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) {
wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f)
}
Expand Down
20 changes: 19 additions & 1 deletion internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ const (
// SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method.
// It will also cause the SDK to immediately handle updates when a handler is registered.
SDKPriorityUpdateHandling = 4
SDKFlagUnknown = math.MaxUint32
// SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost
// when the Default path is blocked.
SDKFlagBlockedSelectorSignalReceive = 5
SDKFlagUnknown = math.MaxUint32
)

var unblockSelectorSignal bool

func sdkFlagFromUint(value uint32) sdkFlag {
switch value {
case uint32(SDKFlagUnset):
Expand All @@ -62,6 +67,8 @@ func sdkFlagFromUint(value uint32) sdkFlag {
return SDKFlagProtocolMessageCommand
case uint32(SDKPriorityUpdateHandling):
return SDKPriorityUpdateHandling
case uint32(SDKFlagBlockedSelectorSignalReceive):
return SDKFlagBlockedSelectorSignalReceive
default:
return SDKFlagUnknown
}
Expand Down Expand Up @@ -105,6 +112,11 @@ func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool {
}
}

// getFlag returns true if the flag is currently set.
func (sf *sdkFlags) getFlag(flag sdkFlag) bool {
return sf.currentFlags[flag] || sf.newFlags[flag]
}

// set marks a flag as in current use regardless of replay status.
func (sf *sdkFlags) set(flags ...sdkFlag) {
if !sf.capabilities.GetSdkMetadata() {
Expand All @@ -131,3 +143,9 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag {
}
return flags
}

// SetUnblockSelectorSignal toggles the flag to unblock the selector signal.
// For test use only,
func SetUnblockSelectorSignal(unblockSignal bool) {
unblockSelectorSignal = unblockSignal
}
2 changes: 2 additions & 0 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ type (
DrainUnhandledUpdates() bool
// TryUse returns true if this flag may currently be used.
TryUse(flag sdkFlag) bool
// GetFlag returns if the flag is currently used.
GetFlag(flag sdkFlag) bool
}

// WorkflowDefinitionFactory factory for creating WorkflowDefinition instances.
Expand Down
18 changes: 17 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,8 +1407,24 @@ func (s *selectorImpl) Select(ctx Context) {
if readyBranch != nil {
return false
}
readyBranch = func() {
// readyBranch is not executed when AddDefault is specified,
// setting the value here prevents the signal from being dropped
env := getWorkflowEnvironment(ctx)
var dropSignalFlag bool
if unblockSelectorSignal {
dropSignalFlag = env.TryUse(SDKFlagBlockedSelectorSignalReceive)
} else {
dropSignalFlag = env.GetFlag(SDKFlagBlockedSelectorSignalReceive)
}

if dropSignalFlag {
c.recValue = &v
}

readyBranch = func() {
if !dropSignalFlag {
c.recValue = &v
}
f(c, more)
}
return true
Expand Down
9 changes: 8 additions & 1 deletion internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ type (

workflowFunctionExecuting bool
bufferedUpdateRequests map[string][]func()

sdkFlags *sdkFlags
}

testSessionEnvironmentImpl struct {
Expand Down Expand Up @@ -313,6 +315,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist
failureConverter: GetDefaultFailureConverter(),
runTimeout: maxWorkflowTimeout,
bufferedUpdateRequests: make(map[string][]func()),
sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}),
}

if debugMode {
Expand Down Expand Up @@ -605,7 +608,11 @@ func (env *testWorkflowEnvironmentImpl) getWorkflowDefinition(wt WorkflowType) (
}

func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
return true
return env.sdkFlags.tryUse(flag, true)
}

func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool {
return env.sdkFlags.getFlag(flag)
}

func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) {
Expand Down
Loading

0 comments on commit 2fd6a07

Please sign in to comment.