Skip to content

Commit

Permalink
Add emitWorkflowVersionMetrics for pinot (#6190)
Browse files Browse the repository at this point in the history
* add emitWorkflowVersionMetrics for pinot

* fmt

* Update service/worker/esanalyzer/workflow.go

Co-authored-by: Shijie Sheng <shengs@uber.com>

* handle aggr errors

* wrap an error

---------

Co-authored-by: Shijie Sheng <shengs@uber.com>
  • Loading branch information
bowenxia and shijiesheng authored Jul 30, 2024
1 parent e60d167 commit 9a7a8a4
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 6 deletions.
152 changes: 149 additions & 3 deletions service/worker/esanalyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func TestEmitWorkflowTypeCountMetricsESErrorCases(t *testing.T) {
Aggregations: map[string]json.RawMessage{},
}, nil).Times(1)
},
expectedErr: nil,
expectedErr: fmt.Errorf("aggregation failed for domain in ES: test-domain"),
},
"Case4: error unmarshalling aggregation": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestEmitWorkflowVersionMetricsESErrorCases(t *testing.T) {
Aggregations: map[string]json.RawMessage{},
}, nil).Times(1)
},
expectedErr: nil,
expectedErr: fmt.Errorf("aggregation failed for domain in ES: test-domain"),
},
"Case4: error unmarshalling aggregation": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) {
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{}, nil).Times(1)
},
expectedErr: nil,
expectedErr: fmt.Errorf("aggregation failed for domain in Pinot: test-domain"),
},
"Case4: error parsing workflow count": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
Expand Down Expand Up @@ -627,3 +627,149 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) {
})
}
}

func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
mockPinotConfig := &config.PinotVisibilityConfig{
Table: "test",
}
mockESConfig := &config.ElasticSearchConfig{
Indices: map[string]string{
common.VisibilityAppName: "test",
},
}

ctrl := gomock.NewController(t)

mockPinotClient := pinot.NewMockGenericClient(ctrl)
mockDomainCache := cache.NewMockDomainCache(ctrl)
testAnalyzer := New(nil, nil, nil, nil, mockPinotClient, mockESConfig, mockPinotConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil)
testWorkflow := &Workflow{analyzer: testAnalyzer}

tests := map[string]struct {
domainCacheAffordance func(mockDomainCache *cache.MockDomainCache)
PinotClientAffordance func(mockPinotClient *pinot.MockGenericClient)
expectedErr error
}{
"Case0: success": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(3)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1},
{"test-wf-version1", 20},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version3", 10},
{"test-wf-version4", 2},
}, nil).Times(1)
},
expectedErr: nil,
},
"Case1: error getting domain": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("domain error")).Times(1)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {},
expectedErr: fmt.Errorf("domain error"),
},
"Case2: error Pinot query": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1)
},
expectedErr: fmt.Errorf("failed to query Pinot to find workflow type count Info: test-domain, error: pinot error"),
},
"Case3: Aggregation is empty": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{}, nil).Times(1)
},
expectedErr: fmt.Errorf("aggregation failed for domain in Pinot: test-domain"),
},
"Case4: error parsing workflow count": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test", "invalid"},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error parsing workflow count for workflow type test"),
},
"Case5-1: failure case in queryWorkflowVersionsWithType: getWorkflowVersionPinotQuery error": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(1)
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("domain error")).Times(1)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: domain error"),
},
"Case5-2: failure case in queryWorkflowVersionsWithType: SearchAggr error": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(2)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: pinot error"),
},
"Case5-3: failure case in queryWorkflowVersionsWithType: error parsing workflow count": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(2)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1.5},
{"test-wf-version1", 20},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: " +
"test-wf-type0: error: error parsing workflow count for workflow version test-wf-version0"),
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
// Set up mocks
test.domainCacheAffordance(mockDomainCache)
test.PinotClientAffordance(mockPinotClient)

err := testWorkflow.emitWorkflowVersionMetricsPinot("test-domain", zap.NewNop())
if err == nil {
assert.Equal(t, test.expectedErr, err)
} else {
assert.Equal(t, test.expectedErr.Error(), err.Error())
}
})
}
}
4 changes: 2 additions & 2 deletions service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (w *Workflow) emitWorkflowTypeCountMetricsPinot(domainName string, logger *
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountPinotQuery),
)
return err
return fmt.Errorf("aggregation failed for domain in Pinot: %s", domainName)
}
var domainWorkflowTypeCount DomainWorkflowTypeCount
for _, row := range response {
Expand Down Expand Up @@ -267,7 +267,7 @@ func (w *Workflow) emitWorkflowTypeCountMetricsES(ctx context.Context, domainNam
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
)
return err
return fmt.Errorf("aggregation failed for domain in ES: %s", domainName)
}
var domainWorkflowTypeCount DomainWorkflowTypeCount
err = json.Unmarshal(agg, &domainWorkflowTypeCount)
Expand Down
164 changes: 163 additions & 1 deletion service/worker/esanalyzer/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
cclient "go.uber.org/cadence/client"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"

"github.com/uber/cadence/common/pinot"
)

const (
Expand Down Expand Up @@ -146,6 +148,47 @@ func (w *Workflow) getWorkflowVersionQuery(domainName string) (string, error) {
`, domain.GetInfo().ID), nil
}

func (w *Workflow) getWorkflowTypePinotQuery(domainName string) (string, error) {
domain, err := w.analyzer.domainCache.GetDomain(domainName)
if err != nil {
return "", err
}
// exclude uninitialized workflow executions by checking whether record has start time field
// there's a "LIMIT 10" because in ES, Aggr clause by default returns the top 10 results
return fmt.Sprintf(`
SELECT WorkflowType, COUNT(*) AS count
FROM %s
WHERE DomainID = '%s'
AND CloseStatus = -1
AND StartTime > 0
GROUP BY WorkflowType
ORDER BY count DESC
LIMIT 10
OFFSET 0
`, w.analyzer.pinotTableName, domain.GetInfo().ID), nil
}

func (w *Workflow) getWorkflowVersionPinotQuery(domainName string, wfType string) (string, error) {
domain, err := w.analyzer.domainCache.GetDomain(domainName)
if err != nil {
return "", err
}
// exclude uninitialized workflow executions by checking whether record has start time field
// there's a "LIMIT 10" because in ES, Aggr clause by default returns the top 10 results
return fmt.Sprintf(`
SELECT JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion, COUNT(*) AS count
FROM %s
WHERE DomainID = '%s'
AND CloseStatus = -1
AND StartTime > 0
AND WorkflowType = '%s'
GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion
ORDER BY count DESC
LIMIT 10
OFFSET 0
`, w.analyzer.pinotTableName, domain.GetInfo().ID, wfType), nil
}

// emitWorkflowVersionMetrics is an activity that emits the running WF versions of a domain
func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error {
logger := activity.GetLogger(ctx)
Expand All @@ -160,6 +203,8 @@ func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error {
switch w.analyzer.readMode {
case ES:
err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger)
case Pinot:
err = w.emitWorkflowVersionMetricsPinot(domainName, logger)
default:
err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger)
}
Expand All @@ -171,6 +216,123 @@ func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error {
return nil
}

func (w *Workflow) emitWorkflowVersionMetricsPinot(domainName string, logger *zap.Logger) error {
wfVersionPinotQuery, err := w.getWorkflowTypePinotQuery(domainName)
if err != nil {
logger.Error("Failed to get Pinot query to find workflow type Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return err
}
response, err := w.analyzer.pinotClient.SearchAggr(&pinot.SearchRequest{Query: wfVersionPinotQuery})
if err != nil {
logger.Error("Failed to query Pinot to find workflow type count Info",
zap.Error(err),
zap.String("VisibilityQuery", wfVersionPinotQuery),
zap.String("DomainName", domainName),
)
return fmt.Errorf("failed to query Pinot to find workflow type count Info: %s, error: %s", domainName, err.Error())
}
foundAggregation := len(response) > 0

if !foundAggregation {
logger.Error("Pinot error: aggregation failed.",
zap.Error(err),
zap.String("Aggregation", fmt.Sprintf("%v", response)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionPinotQuery),
)
return fmt.Errorf("aggregation failed for domain in Pinot: %s", domainName)
}
var domainWorkflowVersionCount DomainWorkflowVersionCount
for _, row := range response {
workflowType := row[0].(string)
workflowCount, ok := row[1].(int)
if !ok {
logger.Error("Error parsing workflow count",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
)
return fmt.Errorf("error parsing workflow count for workflow type %s", workflowType)
}
workflowVersions, err := w.queryWorkflowVersionsWithType(domainName, workflowType, logger)

if err != nil {
logger.Error("Error querying workflow versions",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
)
return fmt.Errorf("error querying workflow versions for workflow type: %s: error: %s", workflowType, err.Error())
}

domainWorkflowVersionCount.WorkflowTypes = append(domainWorkflowVersionCount.WorkflowTypes, WorkflowTypeCount{
EsAggregateCount: EsAggregateCount{
AggregateKey: workflowType,
AggregateCount: int64(workflowCount),
},
WorkflowVersions: workflowVersions,
})
}

for _, workflowType := range domainWorkflowVersionCount.WorkflowTypes {
for _, workflowVersion := range workflowType.WorkflowVersions.WorkflowVersions {
w.analyzer.tallyScope.Tagged(
map[string]string{domainTag: domainName, workflowVersionTag: workflowVersion.AggregateKey, workflowTypeTag: workflowType.AggregateKey},
).Gauge(workflowVersionCountMetrics).Update(float64(workflowVersion.AggregateCount))
}
}
return nil
}

func (w *Workflow) queryWorkflowVersionsWithType(domainName string, wfType string, logger *zap.Logger) (WorkflowVersionCount, error) {
wfVersionPinotQuery, err := w.getWorkflowVersionPinotQuery(domainName, wfType)
if err != nil {
logger.Error("Failed to get Pinot query to find workflow version Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return WorkflowVersionCount{}, err
}

response, err := w.analyzer.pinotClient.SearchAggr(&pinot.SearchRequest{Query: wfVersionPinotQuery})
if err != nil {
logger.Error("Failed to query Pinot to find workflow type count Info",
zap.Error(err),
zap.String("VisibilityQuery", wfVersionPinotQuery),
zap.String("DomainName", domainName),
)
return WorkflowVersionCount{}, err
}
foundAggregation := len(response) > 0

// if no CadenceChangeVersion is found, return an empty WorkflowVersionCount, no errors
if !foundAggregation {
return WorkflowVersionCount{}, nil
}

var workflowVersions WorkflowVersionCount
for _, row := range response {
workflowVersion := row[0].(string)
workflowCount, ok := row[1].(int)
if !ok {
logger.Error("Error parsing workflow count",
zap.Error(err),
zap.String("WorkflowVersion", workflowVersion),
zap.String("DomainName", domainName),
)
return WorkflowVersionCount{}, fmt.Errorf("error parsing workflow count for workflow version %s", workflowVersion)
}
workflowVersions.WorkflowVersions = append(workflowVersions.WorkflowVersions, EsAggregateCount{
AggregateKey: workflowVersion,
AggregateCount: int64(workflowCount),
})
}
return workflowVersions, nil
}

func (w *Workflow) emitWorkflowVersionMetricsES(ctx context.Context, domainName string, logger *zap.Logger) error {
wfVersionEsQuery, err := w.getWorkflowVersionQuery(domainName)
if err != nil {
Expand Down Expand Up @@ -198,7 +360,7 @@ func (w *Workflow) emitWorkflowVersionMetricsES(ctx context.Context, domainName
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionEsQuery),
)
return err
return fmt.Errorf("aggregation failed for domain in ES: %s", domainName)
}
var domainWorkflowVersionCount DomainWorkflowVersionCount
err = json.Unmarshal(agg, &domainWorkflowVersionCount)
Expand Down

0 comments on commit 9a7a8a4

Please sign in to comment.