Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in Describe handler #6312

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,3 +1997,40 @@ func HasMoreRowsToDelete(rowsDeleted, batchSize int) bool {
}
return true
}

func (e *WorkflowExecutionInfo) CopyMemo() map[string][]byte {
if e.Memo == nil {
return nil
}
memo := make(map[string][]byte)
for k, v := range e.Memo {
val := make([]byte, len(v))
copy(val, v)
memo[k] = val
}
return memo
}

func (e *WorkflowExecutionInfo) CopySearchAttributes() map[string][]byte {
if e.SearchAttributes == nil {
return nil
}
searchAttr := make(map[string][]byte)
for k, v := range e.SearchAttributes {
val := make([]byte, len(v))
copy(val, v)
searchAttr[k] = val
}
return searchAttr
}

func (e *WorkflowExecutionInfo) CopyPartitionConfig() map[string]string {
if e.PartitionConfig == nil {
return nil
}
partitionConfig := make(map[string]string)
for k, v := range e.PartitionConfig {
partitionConfig[k] = v
}
return partitionConfig
}
277 changes: 277 additions & 0 deletions common/persistence/data_manager_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package persistence

import (
"bytes"
"errors"
"fmt"
"testing"
Expand Down Expand Up @@ -232,3 +233,279 @@ func TestTimeStampConvertion(t *testing.T) {
unixNanoTime := DBTimestampToUnixNano(milisSecond)
assert.Equal(t, timeNow.UnixNano()/(1000*1000), unixNanoTime/(1000*1000)) // unixNano to milisSecond will result in info loss
}

func TestCopyMemo(t *testing.T) {
tests := []struct {
name string
inputMemo map[string][]byte
expectedOutput map[string][]byte
}{
{
name: "TC1: Memo is nil",
inputMemo: nil,
expectedOutput: nil,
},
{
name: "TC2: Memo is empty",
inputMemo: map[string][]byte{},
expectedOutput: map[string][]byte{},
},
{
name: "TC3: Memo contains multiple entries",
inputMemo: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
},
expectedOutput: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
},
},
{
name: "TC4: Memo contains empty byte slices",
inputMemo: map[string][]byte{
"key1": []byte(""),
"key2": []byte{},
},
expectedOutput: map[string][]byte{
"key1": []byte(""),
"key2": []byte{},
},
},
{
name: "TC5: Memo contains duplicate byte slices",
inputMemo: map[string][]byte{
"key1": []byte("dup"),
"key2": []byte("dup"),
},
expectedOutput: map[string][]byte{
"key1": []byte("dup"),
"key2": []byte("dup"),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
execInfo := &WorkflowExecutionInfo{
Memo: tt.inputMemo,
}

copyMemo := execInfo.CopyMemo()

// Check if both are nil
if tt.expectedOutput == nil {
if copyMemo != nil {
t.Errorf("Expected nil, got %v", copyMemo)
}
return
}

// Check if lengths match
if len(copyMemo) != len(tt.expectedOutput) {
t.Errorf("Expected map length %d, got %d", len(tt.expectedOutput), len(copyMemo))
}

// Check each key-value pair
for key, expectedVal := range tt.expectedOutput {
copiedVal, exists := copyMemo[key]
if !exists {
t.Errorf("Expected key %s not found in copy", key)
continue
}

if !bytes.Equal(copiedVal, expectedVal) {
t.Errorf("For key %s, expected value %v, got %v", key, expectedVal, copiedVal)
}

// Ensure that the byte slices are different underlying arrays (deep copy)
if len(copiedVal) > 0 && &copiedVal[0] == &expectedVal[0] {
t.Errorf("For key %s, byte slices reference the same underlying array", key)
}
}
})
}
}

func TestCopySearchAttributes(t *testing.T) {
tests := []struct {
name string
inputSearchAttrs map[string][]byte
expectedOutputAttrs map[string][]byte
}{
{
name: "TC1: SearchAttributes is nil",
inputSearchAttrs: nil,
expectedOutputAttrs: nil,
},
{
name: "TC2: SearchAttributes is empty",
inputSearchAttrs: map[string][]byte{},
expectedOutputAttrs: map[string][]byte{},
},
{
name: "TC3: SearchAttributes contains multiple entries",
inputSearchAttrs: map[string][]byte{
"attr1": []byte("value1"),
"attr2": []byte("value2"),
},
expectedOutputAttrs: map[string][]byte{
"attr1": []byte("value1"),
"attr2": []byte("value2"),
},
},
{
name: "TC4: SearchAttributes contains empty byte slices",
inputSearchAttrs: map[string][]byte{
"attr1": []byte(""),
"attr2": []byte{},
},
expectedOutputAttrs: map[string][]byte{
"attr1": []byte(""),
"attr2": []byte{},
},
},
{
name: "TC5: SearchAttributes contains duplicate byte slices",
inputSearchAttrs: map[string][]byte{
"attr1": []byte("dup"),
"attr2": []byte("dup"),
},
expectedOutputAttrs: map[string][]byte{
"attr1": []byte("dup"),
"attr2": []byte("dup"),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
execInfo := &WorkflowExecutionInfo{
SearchAttributes: tt.inputSearchAttrs,
}

copyAttrs := execInfo.CopySearchAttributes()
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved

// Check if both are nil
if tt.expectedOutputAttrs == nil {
if copyAttrs != nil {
t.Errorf("Expected nil, got %v", copyAttrs)
}
return
}

// Check if lengths match
if len(copyAttrs) != len(tt.expectedOutputAttrs) {
t.Errorf("Expected map length %d, got %d", len(tt.expectedOutputAttrs), len(copyAttrs))
}

// Check each key-value pair
for key, expectedVal := range tt.expectedOutputAttrs {
copiedVal, exists := copyAttrs[key]
if !exists {
t.Errorf("Expected key %s not found in copy", key)
continue
}

if !bytes.Equal(copiedVal, expectedVal) {
t.Errorf("For key %s, expected value %v, got %v", key, expectedVal, copiedVal)
}

// Ensure that the byte slices are different underlying arrays (deep copy)
if len(copiedVal) > 0 && &copiedVal[0] == &expectedVal[0] {
t.Errorf("For key %s, byte slices reference the same underlying array", key)
}
}
})
}
}

func TestCopyPartitionConfig(t *testing.T) {
tests := []struct {
name string
inputPartitionConfig map[string]string
expectedOutputConfig map[string]string
}{
{
name: "TC1: PartitionConfig is nil",
inputPartitionConfig: nil,
expectedOutputConfig: nil,
},
{
name: "TC2: PartitionConfig is empty",
inputPartitionConfig: map[string]string{},
expectedOutputConfig: map[string]string{},
},
{
name: "TC3: PartitionConfig contains multiple entries",
inputPartitionConfig: map[string]string{
"partition1": "config1",
"partition2": "config2",
},
expectedOutputConfig: map[string]string{
"partition1": "config1",
"partition2": "config2",
},
},
{
name: "TC4: PartitionConfig contains empty strings",
inputPartitionConfig: map[string]string{
"partition1": "",
"partition2": "",
},
expectedOutputConfig: map[string]string{
"partition1": "",
"partition2": "",
},
},
{
name: "TC5: PartitionConfig contains duplicate values",
inputPartitionConfig: map[string]string{
"partition1": "dup",
"partition2": "dup",
},
expectedOutputConfig: map[string]string{
"partition1": "dup",
"partition2": "dup",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
execInfo := &WorkflowExecutionInfo{
PartitionConfig: tt.inputPartitionConfig,
}

copyConfig := execInfo.CopyPartitionConfig()

// Check if both are nil
if tt.expectedOutputConfig == nil {
if copyConfig != nil {
t.Errorf("Expected nil, got %v", copyConfig)
}
return
}

// Check if lengths match
if len(copyConfig) != len(tt.expectedOutputConfig) {
t.Errorf("Expected map length %d, got %d", len(tt.expectedOutputConfig), len(copyConfig))
}

// Check each key-value pair
for key, expectedVal := range tt.expectedOutputConfig {
copiedVal, exists := copyConfig[key]
if !exists {
t.Errorf("Expected key %s not found in copy", key)
continue
}

if copiedVal != expectedVal {
t.Errorf("For key %s, expected value %s, got %s", key, expectedVal, copiedVal)
}
}

// Since strings are immutable in Go, no need to check underlying references
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
StartTime: common.Int64Ptr(executionInfo.StartTimestamp.UnixNano()),
HistoryLength: mutableState.GetNextEventID() - common.FirstEventID,
AutoResetPoints: executionInfo.AutoResetPoints,
Memo: &types.Memo{Fields: executionInfo.Memo},
Memo: &types.Memo{Fields: executionInfo.CopyMemo()},
IsCron: len(executionInfo.CronSchedule) > 0,
UpdateTime: common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()),
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
PartitionConfig: executionInfo.PartitionConfig,
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.CopySearchAttributes()},
PartitionConfig: executionInfo.CopyPartitionConfig(),
},
}

Expand Down
Loading