From 768cb112a41dfa4a433f1b31cec723437dd159c1 Mon Sep 17 00:00:00 2001 From: taylanisikdemir Date: Tue, 5 Mar 2024 15:27:50 -0800 Subject: [PATCH 1/5] Change noisy frontend poll timeout log to debug level (#5725) --- common/util.go | 2 +- common/util_test.go | 2 +- service/frontend/api/handler.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/util.go b/common/util.go index 08d7bae16e5..2320a8e43d4 100644 --- a/common/util.go +++ b/common/util.go @@ -613,7 +613,7 @@ func ValidateLongPollContextTimeout( return err } if timeout < CriticalLongPollTimeout { - logger.Warn("Context timeout is lower than critical value for long poll API.", + logger.Debug("Context timeout is lower than critical value for long poll API.", tag.WorkflowHandlerName(handlerName), tag.WorkflowPollContextTimeout(timeout)) } return nil diff --git a/common/util_test.go b/common/util_test.go index 8627a81f815..de9a30c0e3f 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -1460,7 +1460,7 @@ func TestValidateLongPollContextTimeout(t *testing.T) { t.Run("context timeout is set, but less than CriticalLongPollTimeout", func(t *testing.T) { logger := new(log.MockLogger) logger.On( - "Warn", + "Debug", "Context timeout is lower than critical value for long poll API.", // we can't mock time between deadline and now, so we just check it as it is mock.Anything, diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index 8f29c1557f4..b67b4f25c9c 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -430,7 +430,7 @@ func (wh *WorkflowHandler) PollForActivityTask( if err := common.ValidateLongPollContextTimeout( ctx, "PollForActivityTask", - wh.GetThrottledLogger(), + wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())), ); err != nil { return nil, err } @@ -478,7 +478,7 @@ func (wh *WorkflowHandler) PollForActivityTask( if err := common.ValidateLongPollContextTimeout( ctx, "PollForActivityTask", - wh.GetThrottledLogger(), + wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())), ); err != nil { return &types.PollForActivityTaskResponse{}, nil } @@ -544,7 +544,7 @@ func (wh *WorkflowHandler) PollForDecisionTask( if err := common.ValidateLongPollContextTimeout( ctx, "PollForDecisionTask", - wh.GetThrottledLogger(), + wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())), ); err != nil { return nil, err } @@ -598,7 +598,7 @@ func (wh *WorkflowHandler) PollForDecisionTask( if err := common.ValidateLongPollContextTimeout( ctx, "PollForDecisionTask", - wh.GetThrottledLogger(), + wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())), ); err != nil { return &types.PollForDecisionTaskResponse{}, nil } From 55ff29b86fc99e1e1ffd879426aa9c855a3be97f Mon Sep 17 00:00:00 2001 From: agautam478 <72432016+agautam478@users.noreply.github.com> Date: Tue, 5 Mar 2024 17:29:50 -0800 Subject: [PATCH 2/5] Added unit tests for nosql_execution_Store_util.go (#5723) --- .../nosql/nosql_execution_store_util_test.go | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 common/persistence/nosql/nosql_execution_store_util_test.go diff --git a/common/persistence/nosql/nosql_execution_store_util_test.go b/common/persistence/nosql/nosql_execution_store_util_test.go new file mode 100644 index 00000000000..d5845d3ce64 --- /dev/null +++ b/common/persistence/nosql/nosql_execution_store_util_test.go @@ -0,0 +1,82 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package nosql + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" +) + +func TestNosqlExecutionStoreUtils(t *testing.T) { + testCases := []struct { + name string + setupStore func(*nosqlExecutionStore) (*nosqlplugin.WorkflowExecutionRequest, error) + input *persistence.InternalWorkflowSnapshot + validate func(*testing.T, *nosqlplugin.WorkflowExecutionRequest, error) + }{ + { + name: "PrepareCreateWorkflowExecutionRequestWithMaps - Success", + setupStore: func(store *nosqlExecutionStore) (*nosqlplugin.WorkflowExecutionRequest, error) { + workflowSnapshot := &persistence.InternalWorkflowSnapshot{ + ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + RunID: "test-run-id", + }, + VersionHistories: &persistence.DataBlob{ + Encoding: common.EncodingTypeJSON, + Data: []byte(`[{"Branches":[{"BranchID":"test-branch-id","BeginNodeID":1,"EndNodeID":2}]}]`), + }, + } + return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot) + }, + input: &persistence.InternalWorkflowSnapshot{}, + validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) { + assert.NoError(t, err) + if err == nil { + assert.NotNil(t, req) + } + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockDB := nosqlplugin.NewMockDB(mockCtrl) + store := newTestNosqlExecutionStore(mockDB, log.NewNoop()) + + req, err := tc.setupStore(store) + tc.validate(t, req, err) + }) + } +} From 71a196cfef947dc61ea6acde9b587e0629e619d7 Mon Sep 17 00:00:00 2001 From: David Porter Date: Tue, 5 Mar 2024 23:39:15 -0800 Subject: [PATCH 3/5] Straightforwardly fixes a few minor copy bugs and adds a small fuzz util (#5572) - Adds the library github.com/google/gofuzz. - Adds a small wrapper for some repo-level conventions we may wish to follow by default - notably determinitism and logging the seed value - Adds some probably fairly low value but still easy to test copy functions as a demonstration of value. --- cmd/server/go.sum | 1 + common/testing/testdatagen/fuzzer.go | 38 ++++++++++ .../testdatagen/idlfuzzedtestdata/history.go | 65 ++++++++++++++++ go.mod | 2 + go.sum | 1 + .../engine/engineimpl/historyEngine2_test.go | 2 + .../historyEngine3_eventsv2_test.go | 2 +- .../history/execution/mutable_state_util.go | 31 ++++---- .../execution/mutable_state_util_test.go | 74 +++++++++++++++++++ 9 files changed, 201 insertions(+), 15 deletions(-) create mode 100644 common/testing/testdatagen/fuzzer.go create mode 100644 common/testing/testdatagen/idlfuzzedtestdata/history.go diff --git a/cmd/server/go.sum b/cmd/server/go.sum index 9740f4bce81..c6d604ed49d 100644 --- a/cmd/server/go.sum +++ b/cmd/server/go.sum @@ -187,6 +187,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= diff --git a/common/testing/testdatagen/fuzzer.go b/common/testing/testdatagen/fuzzer.go new file mode 100644 index 00000000000..e9fcef23fb0 --- /dev/null +++ b/common/testing/testdatagen/fuzzer.go @@ -0,0 +1,38 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package testdatagen + +import ( + "testing" + "time" + + fuzz "github.com/google/gofuzz" +) + +// NewFuzzer creates a new fuzzer, notes down the deterministic seed +func New(t *testing.T, generatorFuncs ...interface{}) *fuzz.Fuzzer { + seed := time.Now().Unix() + t.Log("Fuzz Seed:", seed) + + return fuzz.NewWithSeed(time.Now().Unix()).Funcs(generatorFuncs...).NilChance(0.2) +} diff --git a/common/testing/testdatagen/idlfuzzedtestdata/history.go b/common/testing/testdatagen/idlfuzzedtestdata/history.go new file mode 100644 index 00000000000..c7e2d48c736 --- /dev/null +++ b/common/testing/testdatagen/idlfuzzedtestdata/history.go @@ -0,0 +1,65 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package idlfuzzedtestdata + +import ( + "testing" + + fuzz "github.com/google/gofuzz" + + "github.com/uber/cadence/common/testing/testdatagen" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/testdata" +) + +// NewFuzzerWithIDLTypes creates a new fuzzer, notes down the deterministic seed +// this particular invocation is preconfigured to be able to handle idl structs +// correctly without generating completely invalid data (which, while good to test for +// in the context of an application is too wide a search to be useful) +func NewFuzzerWithIDLTypes(t *testing.T) *fuzz.Fuzzer { + return testdatagen.New(t, + // USE THESE VERY SPARINGLY, ONLY WHEN YOU MUST! + // + // The goal of providing these generators for specific types should be + // to use them as little as possible, as they are fixed test data + // which will not evolve with the idl or functions, therefore + // the main benefit of fuzzing - evolving tests to handle all new fields in place - + // will be defeated. + // + // for example, for mappers, if you add a new field that needs to be + // mapped from protobuf to a native-go type (from the types folder) + // and the testdata is fixed here *and not updated*, then the issue + // will not be caught by any roundtrip tests. + GenHistoryEvent, + ) +} + +// GenHistoryEvent is a function to use with gofuzz which +// skips the majority of difficult to generate values +// for the sake of simplicity in testing. Use it with the fuzz.Funcs(...) generation function +func GenHistoryEvent(o *types.HistoryEvent, c fuzz.Continue) { + // todo (david.porter) setup an assertion to ensure this list is exhaustive + i := c.Rand.Intn(len(testdata.HistoryEventArray) - 1) + o = testdata.HistoryEventArray[i] + return +} diff --git a/go.mod b/go.mod index 7bdb937a621..487f6339943 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,8 @@ require ( gopkg.in/yaml.v2 v2.3.0 ) +require github.com/google/gofuzz v1.0.0 + require ( github.com/BurntSushi/toml v0.4.1 // indirect github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect diff --git a/go.sum b/go.sum index 3d398136a89..2e5d16db790 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,7 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8 github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20181127221834-b4f47329b966/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= diff --git a/service/history/engine/engineimpl/historyEngine2_test.go b/service/history/engine/engineimpl/historyEngine2_test.go index d279fba8857..bd3c2e931f7 100644 --- a/service/history/engine/engineimpl/historyEngine2_test.go +++ b/service/history/engine/engineimpl/historyEngine2_test.go @@ -224,6 +224,7 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyExpired() { s.NotNil(response) expectedResponse.StartedTimestamp = response.StartedTimestamp expectedResponse.ScheduledTimestamp = common.Int64Ptr(0) + response.ScheduledTimestamp = common.Int64Ptr(0) expectedResponse.Queries = make(map[string]*types.WorkflowQuery) s.Equal(&expectedResponse, response) } @@ -299,6 +300,7 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { s.NotNil(response) expectedResponse.StartedTimestamp = response.StartedTimestamp expectedResponse.ScheduledTimestamp = common.Int64Ptr(0) + response.ScheduledTimestamp = common.Int64Ptr(0) expectedResponse.Queries = make(map[string]*types.WorkflowQuery) s.Equal(&expectedResponse, response) } diff --git a/service/history/engine/engineimpl/historyEngine3_eventsv2_test.go b/service/history/engine/engineimpl/historyEngine3_eventsv2_test.go index ebce68c5349..363172224c2 100644 --- a/service/history/engine/engineimpl/historyEngine3_eventsv2_test.go +++ b/service/history/engine/engineimpl/historyEngine3_eventsv2_test.go @@ -217,7 +217,7 @@ func (s *engine3Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { s.Nil(err) s.NotNil(response) expectedResponse.StartedTimestamp = response.StartedTimestamp - expectedResponse.ScheduledTimestamp = common.Int64Ptr(0) + expectedResponse.ScheduledTimestamp = response.ScheduledTimestamp expectedResponse.Queries = make(map[string]*types.WorkflowQuery) s.Equal(&expectedResponse, response) } diff --git a/service/history/execution/mutable_state_util.go b/service/history/execution/mutable_state_util.go index 691b7208748..6d75bf825c2 100644 --- a/service/history/execution/mutable_state_util.go +++ b/service/history/execution/mutable_state_util.go @@ -24,6 +24,8 @@ package execution import ( "encoding/json" + "golang.org/x/exp/slices" + "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/persistence" @@ -328,6 +330,7 @@ func CopyWorkflowExecutionInfo(sourceInfo *persistence.WorkflowExecutionInfo) *p ParentDomainID: sourceInfo.ParentDomainID, ParentWorkflowID: sourceInfo.ParentWorkflowID, ParentRunID: sourceInfo.ParentRunID, + IsCron: sourceInfo.IsCron, InitiatedID: sourceInfo.InitiatedID, CompletionEventBatchID: sourceInfo.CompletionEventBatchID, CompletionEvent: sourceInfo.CompletionEvent, @@ -354,6 +357,7 @@ func CopyWorkflowExecutionInfo(sourceInfo *persistence.WorkflowExecutionInfo) *p DecisionRequestID: sourceInfo.DecisionRequestID, DecisionTimeout: sourceInfo.DecisionTimeout, DecisionAttempt: sourceInfo.DecisionAttempt, + DecisionScheduledTimestamp: sourceInfo.DecisionScheduledTimestamp, DecisionStartedTimestamp: sourceInfo.DecisionStartedTimestamp, DecisionOriginalScheduledTimestamp: sourceInfo.DecisionOriginalScheduledTimestamp, CancelRequested: sourceInfo.CancelRequested, @@ -381,8 +385,7 @@ func CopyWorkflowExecutionInfo(sourceInfo *persistence.WorkflowExecutionInfo) *p // CopyActivityInfo copies ActivityInfo func CopyActivityInfo(sourceInfo *persistence.ActivityInfo) *persistence.ActivityInfo { - details := make([]byte, len(sourceInfo.Details)) - copy(details, sourceInfo.Details) + details := slices.Clone(sourceInfo.Details) return &persistence.ActivityInfo{ Version: sourceInfo.Version, @@ -437,24 +440,24 @@ func CopyTimerInfo(sourceInfo *persistence.TimerInfo) *persistence.TimerInfo { // CopyCancellationInfo copies RequestCancelInfo func CopyCancellationInfo(sourceInfo *persistence.RequestCancelInfo) *persistence.RequestCancelInfo { return &persistence.RequestCancelInfo{ - Version: sourceInfo.Version, - InitiatedID: sourceInfo.InitiatedID, - CancelRequestID: sourceInfo.CancelRequestID, + Version: sourceInfo.Version, + InitiatedID: sourceInfo.InitiatedID, + InitiatedEventBatchID: sourceInfo.InitiatedEventBatchID, + CancelRequestID: sourceInfo.CancelRequestID, } } // CopySignalInfo copies SignalInfo func CopySignalInfo(sourceInfo *persistence.SignalInfo) *persistence.SignalInfo { result := &persistence.SignalInfo{ - Version: sourceInfo.Version, - InitiatedID: sourceInfo.InitiatedID, - SignalRequestID: sourceInfo.SignalRequestID, - SignalName: sourceInfo.SignalName, - } - result.Input = make([]byte, len(sourceInfo.Input)) - copy(result.Input, sourceInfo.Input) - result.Control = make([]byte, len(sourceInfo.Control)) - copy(result.Control, sourceInfo.Control) + Version: sourceInfo.Version, + InitiatedEventBatchID: sourceInfo.InitiatedEventBatchID, + InitiatedID: sourceInfo.InitiatedID, + SignalRequestID: sourceInfo.SignalRequestID, + SignalName: sourceInfo.SignalName, + } + result.Input = slices.Clone(sourceInfo.Input) + result.Control = slices.Clone(sourceInfo.Control) return result } diff --git a/service/history/execution/mutable_state_util_test.go b/service/history/execution/mutable_state_util_test.go index 3b412654ba6..030062fb97f 100644 --- a/service/history/execution/mutable_state_util_test.go +++ b/service/history/execution/mutable_state_util_test.go @@ -28,9 +28,83 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/testing/testdatagen/idlfuzzedtestdata" "github.com/uber/cadence/common/types" ) +func TestCopyActivityInfo(t *testing.T) { + t.Run("test CopyActivityInfo Mapping", func(t *testing.T) { + f := idlfuzzedtestdata.NewFuzzerWithIDLTypes(t) + + d1 := persistence.ActivityInfo{} + f.Fuzz(&d1) + d2 := CopyActivityInfo(&d1) + + assert.Equal(t, &d1, d2) + }) +} + +func TestCopyWorkflowExecutionInfo(t *testing.T) { + t.Run("test ExecutionInfo Mapping", func(t *testing.T) { + f := idlfuzzedtestdata.NewFuzzerWithIDLTypes(t) + + d1 := persistence.WorkflowExecutionInfo{} + f.Fuzz(&d1) + d2 := CopyWorkflowExecutionInfo(&d1) + + assert.Equal(t, &d1, d2) + }) +} + +func TestCopyTimerInfoMapping(t *testing.T) { + t.Run("test Timer info Mapping", func(t *testing.T) { + f := idlfuzzedtestdata.NewFuzzerWithIDLTypes(t) + + d1 := persistence.TimerInfo{} + f.Fuzz(&d1) + d2 := CopyTimerInfo(&d1) + + assert.Equal(t, &d1, d2) + }) +} + +func TestChildWorkflowMapping(t *testing.T) { + t.Run("test child workflwo info Mapping", func(t *testing.T) { + f := idlfuzzedtestdata.NewFuzzerWithIDLTypes(t) + + d1 := persistence.ChildExecutionInfo{} + f.Fuzz(&d1) + d2 := CopyChildInfo(&d1) + + assert.Equal(t, &d1, d2) + }) +} + +func TestCopySignalInfo(t *testing.T) { + t.Run("test signal info Mapping", func(t *testing.T) { + f := idlfuzzedtestdata.NewFuzzerWithIDLTypes(t) + + d1 := persistence.SignalInfo{} + f.Fuzz(&d1) + d2 := CopySignalInfo(&d1) + + assert.Equal(t, &d1, d2) + }) +} + +func TestCopyCancellationInfo(t *testing.T) { + t.Run("test signal info Mapping", func(t *testing.T) { + f := idlfuzzedtestdata.NewFuzzerWithIDLTypes(t) + + d1 := persistence.RequestCancelInfo{} + f.Fuzz(&d1) + d2 := CopyCancellationInfo(&d1) + + assert.Equal(t, &d1, d2) + }) +} + func TestFindAutoResetPoint(t *testing.T) { timeSource := clock.NewRealTimeSource() From b443f0273bbe3e175cde2628d724fce4c3a5ddbe Mon Sep 17 00:00:00 2001 From: neil-xie <104041627+neil-xie@users.noreply.github.com> Date: Wed, 6 Mar 2024 01:18:46 -0800 Subject: [PATCH 4/5] Add test for ES v6 client Search method (#5727) * Add test for ES v6 client Search method --- common/elasticsearch/client/v6/client_test.go | 339 +++++++++++++----- 1 file changed, 254 insertions(+), 85 deletions(-) diff --git a/common/elasticsearch/client/v6/client_test.go b/common/elasticsearch/client/v6/client_test.go index d65e68a81a4..217d3002dae 100644 --- a/common/elasticsearch/client/v6/client_test.go +++ b/common/elasticsearch/client/v6/client_test.go @@ -71,39 +71,63 @@ func TestNewV6Client(t *testing.T) { } func TestCreateIndex(t *testing.T) { - testServer := getTestServer(t) + var handlerCalled bool + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handlerCalled = true + if r.URL.Path == "/testIndex" && r.Method == "PUT" { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"acknowledged": true}`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + }) + elasticV6, testServer := getMockClient(t, handler) defer testServer.Close() - // Create a new MockESClient - mockClient, err := elastic.NewClient( - elastic.SetURL(testServer.URL), - elastic.SetSniff(false), - elastic.SetHealthcheck(false), - elastic.SetHttpClient(testServer.Client()), - ) - assert.NoError(t, err) - - elasticV6 := ElasticV6{ - client: mockClient, - } - err = elasticV6.CreateIndex(context.Background(), "testIndex") + err := elasticV6.CreateIndex(context.Background(), "testIndex") + assert.True(t, handlerCalled, "Expected handler to be called") assert.NoError(t, err) } func TestPutMapping(t *testing.T) { - testServer := getTestServer(t) - defer testServer.Close() - // Create a new MockESClient - mockClient, err := elastic.NewClient( - elastic.SetURL(testServer.URL), - elastic.SetSniff(false), - elastic.SetHealthcheck(false), - elastic.SetHttpClient(testServer.Client())) - assert.NoError(t, err) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/testIndex/_mapping/_doc" && r.Method == "PUT" { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("Failed to read request body: %v", err) + } + defer r.Body.Close() + var receivedMapping map[string]interface{} + if err := json.Unmarshal(body, &receivedMapping); err != nil { + t.Fatalf("Failed to unmarshal request body: %v", err) + } - elasticV6 := ElasticV6{ - client: mockClient, - } - err = elasticV6.PutMapping(context.Background(), "testIndex", `{ + // Define expected mapping structurally + expectedMapping := map[string]interface{}{ + "properties": map[string]interface{}{ + "title": map[string]interface{}{ + "type": "text", + }, + "publish_date": map[string]interface{}{ + "type": "date", + }, + }, + } + + // Compare structurally + if !assert.Equal(t, expectedMapping, receivedMapping) { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"acknowledged": true}`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + }) + elasticV6, testServer := getMockClient(t, handler) + defer testServer.Close() + err := elasticV6.PutMapping(context.Background(), "testIndex", `{ "properties": { "title": { "type": "text" @@ -117,77 +141,222 @@ func TestPutMapping(t *testing.T) { } func TestCount(t *testing.T) { - testServer := getTestServer(t) - defer testServer.Close() - // Create a new MockESClient - mockClient, err := elastic.NewClient( - elastic.SetURL(testServer.URL), - elastic.SetSniff(false), - elastic.SetHealthcheck(false), - elastic.SetHttpClient(testServer.Client())) - assert.NoError(t, err) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/testIndex/_count" && r.Method == "POST" { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("Failed to read request body: %v", err) + } + defer r.Body.Close() + expectedQuery := `{"query":{"match":{"WorkflowID":"test-workflow-id"}}}` + if string(body) != expectedQuery { + t.Fatalf("Expected query %s, got %s", expectedQuery, body) + } - elasticV6 := ElasticV6{ - client: mockClient, - } + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"count": 42}`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + }) + elasticV6, testServer := getMockClient(t, handler) + defer testServer.Close() count, err := elasticV6.Count(context.Background(), "testIndex", `{"query":{"match":{"WorkflowID":"test-workflow-id"}}}`) assert.NoError(t, err) assert.Equal(t, int64(42), count) } -func getTestServer(t *testing.T) *httptest.Server { - return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Read the request body - body, err := io.ReadAll(r.Body) - if err != nil { - t.Fatalf("Failed to read request body: %v", err) - } - defer r.Body.Close() - - switch r.URL.Path { - case "/testIndex": - if r.Method == "PUT" { - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"acknowledged": true}`)) - } - case "/testIndex/_mapping/_doc": - if r.Method == "PUT" { - var receivedMapping map[string]interface{} - if err := json.Unmarshal(body, &receivedMapping); err != nil { - t.Fatalf("Failed to unmarshal request body: %v", err) +func TestSearch(t *testing.T) { + testCases := []struct { + name string + query string + expected map[string]interface{} + expectErr bool + expectAgg bool + index string + handler http.HandlerFunc + }{ + { + name: "normal case", + query: `{"query":{"bool":{"must":{"match":{"WorkflowID":"test-workflow-id"}}}}}`, + index: "testIndex", + expected: map[string]interface{}{ + "WorkflowID": "test-workflow-id", + }, + handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/testIndex/_search" { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "took": 5, + "timed_out": false, + "hits": { + "total": 1, + "hits": [{ + "_source": { + "WorkflowID": "test-workflow-id" + }, + "sort": [1] + }] + } + }`)) + } else { + w.WriteHeader(http.StatusNotFound) } - - // Define expected mapping structurally - expectedMapping := map[string]interface{}{ - "properties": map[string]interface{}{ - "title": map[string]interface{}{ - "type": "text", + }), + expectErr: false, + }, + { + name: "elasticsearch error", + query: `{"query":{"bool":{"must":{"match":{"WorkflowID":"test-workflow-id"}}}}}`, + index: "testIndex", + handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/testIndex/_search" { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "error": { + "root_cause": [ + { + "type": "index_not_found_exception", + "reason": "no such index", + "resource.type": "index_or_alias", + "resource.id": "testIndex", + "index_uuid": "_na_", + "index": "testIndex" + } + ], + "type": "index_not_found_exception", + "reason": "no such index", + "resource.type": "index_or_alias", + "resource.id": "testIndex", + "index_uuid": "_na_", + "index": "testIndex" }, - "publish_date": map[string]interface{}{ - "type": "date", + "status": 404 + }`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + }), + expectErr: true, + }, + { + name: "elasticsearch timeout", + query: `{"query":{"bool":{"must":{"match":{"WorkflowID":"test-workflow-id"}}}}}`, + index: "testIndex", + expectErr: true, + handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/testIndex/_search" { + w.WriteHeader(http.StatusOK) // Assuming Elasticsearch returns HTTP 200 for timeouts with an indication in the body + w.Write([]byte(`{ + "took": 30, + "timed_out": true, + "hits": { + "total": 0, + "hits": [] + } + }`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + }), + }, + { + name: "elasticsearch aggregations", + query: `{"query":{"bool":{"must":{"match":{"WorkflowID":"test-workflow-id"}}}}}`, + index: "testIndex", + expected: map[string]interface{}{ + "WorkflowID": "test-workflow-id", + }, + expectErr: false, + expectAgg: true, + handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/testIndex/_search" { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "took": 5, + "timed_out": false, + "hits": { + "total": 1, + "hits": [{ + "_source": { + "WorkflowID": "test-workflow-id" + } + }] }, - }, + "aggregations": { + "sample_agg": { + "value": 42 + } + } + }`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + }), + }, + { + name: "elasticsearch non exist index", + query: `{"query":{"bool":{"must":{"match":{"WorkflowID":"test-workflow-id"}}}}}`, + index: "test_failure", + expectErr: true, + handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/testIndex/_search" { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusNotFound) } + }), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + elasticV6, testServer := getMockClient(t, tc.handler) + defer testServer.Close() + resp, err := elasticV6.Search(context.Background(), tc.index, tc.query) + if !tc.expectErr { + assert.NoError(t, err) + assert.NotNil(t, resp) + // Verify the response details + assert.Equal(t, int64(5), resp.TookInMillis) + assert.Equal(t, int64(1), resp.TotalHits) + assert.NotNil(t, resp.Hits) + assert.Len(t, resp.Hits.Hits, 1) - // Compare structurally - if !assert.Equal(t, expectedMapping, receivedMapping) { - w.WriteHeader(http.StatusBadRequest) - return + var actual map[string]interface{} + if err := json.Unmarshal([]byte(string(resp.Hits.Hits[0].Source)), &actual); err != nil { + t.Fatalf("Failed to unmarshal actual JSON: %v", err) } + assert.Equal(t, tc.expected, actual) - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"acknowledged": true}`)) - } - case "/testIndex/_count": - expectedQuery := `{"query":{"match":{"WorkflowID":"test-workflow-id"}}}` - if string(body) != expectedQuery { - t.Fatalf("Expected query %s, got %s", expectedQuery, body) + if tc.expectAgg { + // Verify the response includes the expected aggregations + assert.NotNil(t, resp.Aggregations, "Aggregations should not be nil") + assert.Contains(t, resp.Aggregations, "sample_agg", "Aggregations should contain 'sample_agg'") + + // Additional assertions can be made to verify the contents of the aggregation + sampleAgg := resp.Aggregations["sample_agg"] + var aggResult map[string]interface{} + err = json.Unmarshal(sampleAgg, &aggResult) + assert.NoError(t, err) + assert.Equal(t, float64(42), aggResult["value"], "Aggregation 'sample_agg' should have a value of 42") + } + } else { + assert.Error(t, err) } + }) + } +} - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"count": 42}`)) - default: - w.WriteHeader(http.StatusNotFound) - } - })) +func getMockClient(t *testing.T, handler http.HandlerFunc) (ElasticV6, *httptest.Server) { + testServer := httptest.NewTLSServer(handler) + mockClient, err := elastic.NewClient( + elastic.SetURL(testServer.URL), + elastic.SetSniff(false), + elastic.SetHealthcheck(false), + elastic.SetHttpClient(testServer.Client())) + assert.NoError(t, err) + return ElasticV6{ + client: mockClient, + }, testServer } From cb39e8277c8d0876f2d60eddb4272909dd0371b2 Mon Sep 17 00:00:00 2001 From: Abhishek Jha Date: Wed, 6 Mar 2024 08:53:23 -0800 Subject: [PATCH 5/5] Tests for Common/Domain: Adding tests for replication queue message handling and ack update (#5730) * Common/Domain: Adding tests for replication_queue:ReplicationMessages, UpdateAckLevel --- common/domain/replication_queue_test.go | 95 +++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/common/domain/replication_queue_test.go b/common/domain/replication_queue_test.go index 365934f3a13..a795401f18a 100644 --- a/common/domain/replication_queue_test.go +++ b/common/domain/replication_queue_test.go @@ -115,3 +115,98 @@ func TestReplicationQueueImpl_PublishToDLQ(t *testing.T) { }) } } + +func TestGetReplicationMessages(t *testing.T) { + + tests := []struct { + name string + lastID int64 + maxCount int + task *types.ReplicationTask + wantErr bool + setupMock func(q *persistence.MockQueueManager) + }{ + { + name: "successful message retrieval", + lastID: 100, + maxCount: 10, + task: &types.ReplicationTask{}, + wantErr: false, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().ReadMessages(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq(10)).Return(persistence.QueueMessageList{}, nil) + }, + }, + { + name: "read messages fails", + lastID: 100, + maxCount: 10, + wantErr: true, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().ReadMessages(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("read error")) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil) + + tt.setupMock(mockQueue) + _, _, err := rq.GetReplicationMessages(context.Background(), tt.lastID, tt.maxCount) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + ctrl.Finish() + }) + } +} + +func TestUpdateAckLevel(t *testing.T) { + tests := []struct { + name string + lastID int64 + cluster string + wantErr bool + setupMock func(q *persistence.MockQueueManager) + }{ + { + name: "successful ack level update", + lastID: 100, + cluster: "testCluster", + wantErr: false, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().UpdateAckLevel(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq("testCluster")).Return(nil) + }, + }, + { + name: "ack level update fails", + lastID: 100, + cluster: "testCluster", + wantErr: true, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().UpdateAckLevel(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq("testCluster")).Return(errors.New("update error")) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil) + tt.setupMock(mockQueue) + err := rq.UpdateAckLevel(context.Background(), tt.lastID, tt.cluster) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + ctrl.Finish() + }) + } +}