diff --git a/common/pinot/page_token_test.go b/common/pinot/page_token_test.go new file mode 100644 index 00000000000..ebbe59b2a6c --- /dev/null +++ b/common/pinot/page_token_test.go @@ -0,0 +1,55 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package pinot + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSerializePageToken(t *testing.T) { + token := &PinotVisibilityPageToken{ + From: 1, + } + + tests := map[string]struct { + token *PinotVisibilityPageToken + expectedOutput []byte + expectedError error + }{ + "Case2: normal case with nil response": { + token: token, + expectedOutput: []byte(`{"From":1}`), + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualOutput, err := SerializePageToken(test.token) + assert.Equal(t, test.expectedOutput, actualOutput) + assert.Nil(t, err) + }) + } +} diff --git a/common/pinot/pinotClient.go b/common/pinot/pinot_client.go similarity index 100% rename from common/pinot/pinotClient.go rename to common/pinot/pinot_client.go diff --git a/common/pinot/pinotClient_test.go b/common/pinot/pinot_client_test.go similarity index 53% rename from common/pinot/pinotClient_test.go rename to common/pinot/pinot_client_test.go index fe4c1fcfe6f..37545133193 100644 --- a/common/pinot/pinotClient_test.go +++ b/common/pinot/pinot_client_test.go @@ -24,12 +24,17 @@ package pinot import ( "fmt" + "net/http" + "net/http/httptest" + "strings" "testing" "time" "github.com/startreedata/pinot-client-go/pinot" "github.com/stretchr/testify/assert" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log/testlogger" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) @@ -51,6 +56,165 @@ var ( } ) +func TestSearch(t *testing.T) { + query := "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10" + + request := &SearchRequest{ + Query: query, + ListRequest: &p.InternalListWorkflowExecutionsRequest{ + NextPageToken: nil, + }, + } + + errorRequest := &SearchRequest{ + Query: "error", + ListRequest: &p.InternalListWorkflowExecutionsRequest{ + NextPageToken: []byte("ha-ha"), + }, + } + + tests := map[string]struct { + inputRequest *SearchRequest + expectedError error + server *httptest.Server + }{ + "Case1-1: error internal server case": { + inputRequest: errorRequest, + expectedError: &types.InternalServiceError{ + Message: fmt.Sprintf("Pinot Search failed, caught http exception when querying Pinot: 400 Bad Request"), + }, + server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + })), + }, + "Case1-2: error json conversion case": { + inputRequest: errorRequest, + expectedError: &types.InternalServiceError{ + Message: fmt.Sprintf("Get NextPage token failed, unable to deserialize page token. err: invalid character 'h' looking for beginning of value"), + }, + server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + assert.Equal(t, "POST", r.Method) + assert.True(t, strings.HasSuffix(r.RequestURI, "/query/sql")) + fmt.Fprintln(w, "{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"LONG\"],\"columnNames\":[\"cnt\"]},\"rows\":[[97.889]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":0,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":5,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") + })), + }, + "Case2: normal case": { + inputRequest: request, + expectedError: nil, + server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + assert.Equal(t, "POST", r.Method) + assert.True(t, strings.HasSuffix(r.RequestURI, "/query/sql")) + fmt.Fprintln(w, "{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"LONG\"],\"columnNames\":[\"cnt\"]},\"rows\":[[97889]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":0,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":5,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") + })), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ts := test.server + defer ts.Close() + pinotConnection, err := pinot.NewFromBrokerList([]string{ts.URL}) + assert.NotNil(t, pinotConnection) + assert.Nil(t, err) + + pinotClient := NewPinotClient(pinotConnection, testlogger.New(t), &config.PinotVisibilityConfig{ + Table: "", + ServiceName: "", + }) + + actualOutput, err := pinotClient.Search(test.inputRequest) + + if test.expectedError != nil { + assert.Nil(t, actualOutput) + assert.Equal(t, test.expectedError.Error(), err.Error()) + } else { + assert.NotNil(t, actualOutput) + assert.Nil(t, err) + } + }) + } +} + +func TestCountByQuery(t *testing.T) { + errorQuery := "error" + query := "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10" + + tests := map[string]struct { + inputQuery string + expectedOutput int64 + expectedError error + server *httptest.Server + }{ + "Case1-1: error internal server case": { + inputQuery: errorQuery, + expectedOutput: 0, + expectedError: &types.InternalServiceError{ + Message: fmt.Sprintf("CountWorkflowExecutions ExecuteSQL failed, caught http exception when querying Pinot: 400 Bad Request"), + }, + server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + })), + }, + "Case1-2: error json conversion case": { + inputQuery: errorQuery, + expectedOutput: -1, + expectedError: &types.InternalServiceError{ + Message: fmt.Sprintf("can't convert result to integer!, query = error, query result = 97.889, err = strconv.ParseInt: parsing \"97.889\": invalid syntax"), + }, + server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + assert.Equal(t, "POST", r.Method) + assert.True(t, strings.HasSuffix(r.RequestURI, "/query/sql")) + fmt.Fprintln(w, "{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"LONG\"],\"columnNames\":[\"cnt\"]},\"rows\":[[97.889]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":0,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":5,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") + })), + }, + "Case2: normal case": { + inputQuery: query, + expectedOutput: 97889, + expectedError: nil, + server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + assert.Equal(t, "POST", r.Method) + assert.True(t, strings.HasSuffix(r.RequestURI, "/query/sql")) + fmt.Fprintln(w, "{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"LONG\"],\"columnNames\":[\"cnt\"]},\"rows\":[[97889]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":0,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":5,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") + })), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ts := test.server + defer ts.Close() + pinotConnection, err := pinot.NewFromBrokerList([]string{ts.URL}) + assert.NotNil(t, pinotConnection) + assert.Nil(t, err) + + pinotClient := NewPinotClient(pinotConnection, testlogger.New(t), &config.PinotVisibilityConfig{ + Table: "", + ServiceName: "", + }) + + actualOutput, err := pinotClient.CountByQuery(test.inputQuery) + assert.Equal(t, test.expectedOutput, actualOutput) + if test.expectedError != nil { + assert.Equal(t, test.expectedError.Error(), err.Error()) + } else { + assert.Nil(t, err) + } + }) + } +} + +func TestGetTableName(t *testing.T) { + assert.Equal(t, "", client.GetTableName()) +} + func TestBuildMap(t *testing.T) { columnName := []string{"WorkflowID", "RunID", "WorkflowType", "DomainID", "StartTime", "ExecutionTime", "CloseTime", "CloseStatus", "HistoryLength", "TaskList", "IsCron", "NumClusters", "UpdateTime", "CustomIntField", "CustomStringField"} hit := []interface{}{"wfid", "rid", "wftype", "domainid", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "tsklst", true, 1, testEarliestTime, 1, "some string"} @@ -74,28 +238,42 @@ func TestBuildMap(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - assert.NotPanics(t, func() { - resMap := buildMap(test.inputHit, test.inputColumnNames) - assert.Equal(t, test.expectedMap, resMap) - }) + resMap := buildMap(test.inputHit, test.inputColumnNames) + assert.Equal(t, test.expectedMap, resMap) }) } } func TestConvertSearchResultToVisibilityRecord(t *testing.T) { columnName := []string{"WorkflowID", "RunID", "WorkflowType", "DomainID", "StartTime", "ExecutionTime", "CloseTime", "CloseStatus", "HistoryLength", "TaskList", "IsCron", "NumClusters", "UpdateTime", "Attr"} - hit := []interface{}{"wfid", "rid", "wftype", "domainid", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "tsklst", true, 1, testEarliestTime, "{}"} - closeStatus := types.WorkflowExecutionCloseStatusFailed - - columnNameWithAttr := []string{"WorkflowID", "RunID", "WorkflowType", "DomainID", "StartTime", "ExecutionTime", "CloseTime", "CloseStatus", "HistoryLength", "TaskList", "IsCron", "NumClusters", "UpdateTime", "Attr"} + hit := []interface{}{"wfid", "rid", "wftype", "domainid", testEarliestTime, testEarliestTime, testLatestTime, -1, 1, "tsklst", true, 1, testEarliestTime, "{}"} hitWithAttr := []interface{}{"wfid", "rid", "wftype", "domainid", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "tsklst", true, 1, testEarliestTime, `{"CustomStringField": "customA and customB or customC", "CustomDoubleField": 3.14}`} + hitWithAttrMarshalError := []interface{}{"wfid", "rid", "wftype", "domainid", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "tsklst", true, 1, testEarliestTime, make(chan int)} + hitWithAttrUnmarshalError := []interface{}{"wfid", "rid", "wftype", "domainid", testEarliestTime, testEarliestTime, testLatestTime, 1, "1", "tsklst", true, 1, testEarliestTime, `{"CustomStringField": "customA and customB or customC", "CustomDoubleField": 3.14}`} + + closeStatus := types.WorkflowExecutionCloseStatusFailed tests := map[string]struct { inputColumnNames []string inputHit []interface{} expectedVisibilityRecord *p.InternalVisibilityWorkflowExecutionInfo }{ - "Case1: with everything except for an empty Attr": { + "Case1: nil result": { + inputColumnNames: nil, + inputHit: hit, + expectedVisibilityRecord: nil, + }, + "Case2-1: marshal system key error case": { + inputColumnNames: columnName, + inputHit: hitWithAttrMarshalError, + expectedVisibilityRecord: nil, + }, + "Case2-2: unmarshal system key error case": { + inputColumnNames: columnName, + inputHit: hitWithAttrUnmarshalError, + expectedVisibilityRecord: nil, + }, + "Case3-1: with everything except for an empty Attr": { inputColumnNames: columnName, inputHit: hit, expectedVisibilityRecord: &p.InternalVisibilityWorkflowExecutionInfo{ @@ -107,7 +285,7 @@ func TestConvertSearchResultToVisibilityRecord(t *testing.T) { StartTime: time.UnixMilli(testEarliestTime), ExecutionTime: time.UnixMilli(testEarliestTime), CloseTime: time.UnixMilli(testLatestTime), - Status: &closeStatus, + Status: nil, HistoryLength: 1, Memo: nil, TaskList: "tsklst", @@ -118,8 +296,8 @@ func TestConvertSearchResultToVisibilityRecord(t *testing.T) { ShardID: 0, }, }, - "Case2: with everything": { - inputColumnNames: columnNameWithAttr, + "Case3-2: with everything": { + inputColumnNames: columnName, inputHit: hitWithAttr, expectedVisibilityRecord: &p.InternalVisibilityWorkflowExecutionInfo{ DomainID: "domainid", @@ -146,7 +324,7 @@ func TestConvertSearchResultToVisibilityRecord(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { assert.NotPanics(t, func() { - visibilityRecord := ConvertSearchResultToVisibilityRecord(test.inputHit, test.inputColumnNames, nil) + visibilityRecord := ConvertSearchResultToVisibilityRecord(test.inputHit, test.inputColumnNames, testlogger.New(t)) assert.Equal(t, test.expectedVisibilityRecord, visibilityRecord) }) }) @@ -285,21 +463,45 @@ func TestGetInternalGetClosedWorkflowExecutionResponse(t *testing.T) { MinConsumingFreshnessTimeMs: 1, } - result, err := client.getInternalGetClosedWorkflowExecutionResponse(brokerResponse) - - assert.Equal(t, "wfid1", result.Execution.WorkflowID) - assert.Equal(t, "rid1", result.Execution.RunID) - assert.Equal(t, "wftype1", result.Execution.WorkflowType) - assert.Equal(t, "domainid1", result.Execution.DomainID) - assert.Equal(t, time.UnixMilli(testEarliestTime), result.Execution.StartTime) - assert.Equal(t, time.UnixMilli(testEarliestTime), result.Execution.ExecutionTime) - assert.Equal(t, time.UnixMilli(testLatestTime), result.Execution.CloseTime) - assert.Equal(t, types.WorkflowExecutionCloseStatus(1), *result.Execution.Status) - assert.Equal(t, int64(1), result.Execution.HistoryLength) - assert.Equal(t, "tsklst1", result.Execution.TaskList) - assert.Equal(t, true, result.Execution.IsCron) - assert.Equal(t, int16(1), result.Execution.NumClusters) - assert.Equal(t, time.UnixMilli(testEarliestTime), result.Execution.UpdateTime) + tests := map[string]struct { + input *pinot.BrokerResponse + isInputEmpty bool + expectedError error + }{ + "Case1: empty case": { + input: nil, + isInputEmpty: true, + expectedError: nil, + }, + "Case2: with everything": { + input: brokerResponse, + isInputEmpty: false, + expectedError: nil, + }, + } - assert.Nil(t, err) + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualOutput, actualError := client.getInternalGetClosedWorkflowExecutionResponse(test.input) + if test.isInputEmpty { + assert.Nil(t, actualOutput) + assert.Nil(t, actualError) + } else { + assert.Equal(t, "wfid1", actualOutput.Execution.WorkflowID) + assert.Equal(t, "rid1", actualOutput.Execution.RunID) + assert.Equal(t, "wftype1", actualOutput.Execution.WorkflowType) + assert.Equal(t, "domainid1", actualOutput.Execution.DomainID) + assert.Equal(t, time.UnixMilli(testEarliestTime), actualOutput.Execution.StartTime) + assert.Equal(t, time.UnixMilli(testEarliestTime), actualOutput.Execution.ExecutionTime) + assert.Equal(t, time.UnixMilli(testLatestTime), actualOutput.Execution.CloseTime) + assert.Equal(t, types.WorkflowExecutionCloseStatus(1), *actualOutput.Execution.Status) + assert.Equal(t, int64(1), actualOutput.Execution.HistoryLength) + assert.Equal(t, "tsklst1", actualOutput.Execution.TaskList) + assert.Equal(t, true, actualOutput.Execution.IsCron) + assert.Equal(t, int16(1), actualOutput.Execution.NumClusters) + assert.Equal(t, time.UnixMilli(testEarliestTime), actualOutput.Execution.UpdateTime) + assert.Nil(t, actualError) + } + }) + } }