Skip to content

Commit

Permalink
feat(metrics-operator): implement interface for analysis value retrie…
Browse files Browse the repository at this point in the history
…val in DQL provider (#2194)

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
  • Loading branch information
bacherfl committed Oct 2, 2023
1 parent 8e5dd76 commit 3d7f737
Show file tree
Hide file tree
Showing 2 changed files with 367 additions and 67 deletions.
190 changes: 123 additions & 67 deletions metrics-operator/controllers/common/providers/dynatrace/dynatrace_dql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ type keptnDynatraceDQLProvider struct {
clock clock.Clock
}

func (d *keptnDynatraceDQLProvider) FetchAnalysisValue(ctx context.Context, query string, analysis metricsapi.Analysis, provider *metricsapi.KeptnMetricsProvider) (string, error) {
//TODO implement me
panic("implement me")
}

type DynatraceDQLHandler struct {
RequestToken string `json:"requestToken"`
}
Expand Down Expand Up @@ -68,6 +63,47 @@ type DQLRequest struct {
RequestTimeoutMilliseconds int `json:"requestTimeoutMilliseconds"`
}

type timeframe struct {
from time.Time
to time.Time
}

type metricRequest struct {
query string
timeframe *timeframe
}

func newMetricRequestFromMetric(metric metricsapi.KeptnMetric) (*metricRequest, error) {
res := &metricRequest{
query: metric.Spec.Query,
}

if metric.Spec.Range != nil {
intervalDuration, err := time.ParseDuration(metric.Spec.Range.Interval)
if err != nil {
return nil, err
}
res.timeframe = &timeframe{
from: time.Now().UTC().Add(-intervalDuration),
to: time.Now().UTC(),
}
}

return res, nil
}

func newMetricRequestFromAnalysis(query string, analysis metricsapi.Analysis) (*metricRequest, error) {
res := &metricRequest{
query: query,
timeframe: &timeframe{
from: analysis.GetFrom(),
to: analysis.GetTo(),
},
}

return res, nil
}

type KeptnDynatraceDQLProviderOption func(provider *keptnDynatraceDQLProvider)

func WithDTAPIClient(dtApiClient dtclient.DTAPIClient) KeptnDynatraceDQLProviderOption {
Expand Down Expand Up @@ -97,8 +133,32 @@ func NewKeptnDynatraceDQLProvider(k8sClient client.Client, opts ...KeptnDynatrac
return provider
}

func (d *keptnDynatraceDQLProvider) FetchAnalysisValue(ctx context.Context, query string, analysis metricsapi.Analysis, provider *metricsapi.KeptnMetricsProvider) (string, error) {
metricsReq, err := newMetricRequestFromAnalysis(query, analysis)
if err != nil {
return "", err
}

results, err := d.getResults(ctx, *metricsReq, *provider)
if err != nil {
return "", err
}

if len(results.Records) > 1 {
d.log.Info("More than a single result, the first one will be used")
}

value := extractValueFromRecord(results.Records[0])

return value, nil
}

func (d *keptnDynatraceDQLProvider) EvaluateQuery(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) (string, []byte, error) {
results, err := d.getResults(ctx, metric, provider)
metricsReq, err := newMetricRequestFromMetric(metric)
if err != nil {
return "", nil, err
}
results, err := d.getResults(ctx, *metricsReq, provider)
if err != nil {
return "", nil, err
}
Expand All @@ -121,43 +181,14 @@ func (d *keptnDynatraceDQLProvider) EvaluateQuery(ctx context.Context, metric me
return value, b, nil
}

// extractValueFromRecord extracts the latest value of a record.
// This is intended for timeseries queries that return a single metric
func extractValueFromRecord(record map[string]any) string {
for _, item := range record {
if valuesArr, ok := toFloatArray(item); ok {
return fmt.Sprintf("%f", valuesArr[len(valuesArr)-1])
}
}
return ""
}

func toFloatArray(obj any) ([]float64, bool) {
valuesArr, ok := obj.([]any)
if !ok {
return nil, false
}
res := make([]float64, len(valuesArr))
for index, val := range valuesArr {
if floatVal, ok := val.(float64); ok {
res[index] = floatVal
} else if intVal, ok := val.(int); ok {
res[index] = float64(intVal)
} else {
return nil, false
}
}
return res, true
}

func (d *keptnDynatraceDQLProvider) EvaluateQueryForStep(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) ([]string, []byte, error) {
results, err := d.getResults(ctx, metric, provider)
metricsReq, err := newMetricRequestFromMetric(metric)
if err != nil {
return nil, nil, err
}

if len(results.Records) == 0 {
return nil, nil, ErrInvalidResult
results, err := d.getResults(ctx, *metricsReq, provider)
if err != nil {
return nil, nil, err
}

r := extractValuesFromRecord(results.Records[0])
Expand All @@ -169,14 +200,14 @@ func (d *keptnDynatraceDQLProvider) EvaluateQueryForStep(ctx context.Context, me
return r, b, nil
}

func (d *keptnDynatraceDQLProvider) getResults(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) (*DQLResult, error) {
func (d *keptnDynatraceDQLProvider) getResults(ctx context.Context, metricsReq metricRequest, provider metricsapi.KeptnMetricsProvider) (*DQLResult, error) {
if err := d.ensureDTClientIsSetUp(ctx, provider); err != nil {
return nil, err
}

b, status, err := d.postDQL(ctx, metric)
b, status, err := d.postDQL(ctx, metricsReq)
if err != nil {
d.log.Error(err, "Error while posting the DQL query", "query", metric.Spec.Query)
d.log.Error(err, "Error while posting the DQL query", "query", metricsReq.query)
return nil, err
}

Expand All @@ -187,22 +218,6 @@ func (d *keptnDynatraceDQLProvider) getResults(ctx context.Context, metric metri
return results, nil
}

// extractValuesFromRecord extracts all values of a record.
// This is intended for timeseries queries that return multiple values for a single metric, i.e. the individual
// data points of a time series
func extractValuesFromRecord(record map[string]any) []string {
for _, item := range record {
if valuesArr, ok := toFloatArray(item); ok {
valuesStrArr := make([]string, len(valuesArr))
for index, val := range valuesArr {
valuesStrArr[index] = fmt.Sprintf("%f", val)
}
return valuesStrArr
}
}
return []string{}
}

func (d *keptnDynatraceDQLProvider) parseDQLResults(b []byte, status int) (*DQLResult, error) {
results := &DQLResult{}
if status == http.StatusOK {
Expand Down Expand Up @@ -253,15 +268,15 @@ func (d *keptnDynatraceDQLProvider) ensureDTClientIsSetUp(ctx context.Context, p
return nil
}

func (d *keptnDynatraceDQLProvider) postDQL(ctx context.Context, metric metricsapi.KeptnMetric) ([]byte, int, error) {
func (d *keptnDynatraceDQLProvider) postDQL(ctx context.Context, metricsReq metricRequest) ([]byte, int, error) {
d.log.V(10).Info("posting DQL")
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

path := defaultPath + "execute"

payload := DQLRequest{
Query: metric.Spec.Query,
Query: metricsReq.query,
DefaultTimeframeStart: "",
DefaultTimeframeEnd: "",
Timezone: "UTC",
Expand All @@ -270,13 +285,9 @@ func (d *keptnDynatraceDQLProvider) postDQL(ctx context.Context, metric metricsa
RequestTimeoutMilliseconds: 1000,
}

if metric.Spec.Range != nil {
intervalDuration, err := time.ParseDuration(metric.Spec.Range.Interval)
if err != nil {
return nil, 0, err
}
payload.DefaultTimeframeStart = time.Now().UTC().Add(-intervalDuration).Format(time.RFC3339)
payload.DefaultTimeframeEnd = time.Now().UTC().Format(time.RFC3339)
if metricsReq.timeframe != nil {
payload.DefaultTimeframeStart = metricsReq.timeframe.from.Format(time.RFC3339)
payload.DefaultTimeframeEnd = metricsReq.timeframe.to.Format(time.RFC3339)
}

payloadBytes, err := json.Marshal(payload)
Expand Down Expand Up @@ -337,3 +348,48 @@ func (d *keptnDynatraceDQLProvider) retrieveDQLResults(ctx context.Context, hand
}
return result, nil
}

// extractValueFromRecord extracts the latest value of a record.
// This is intended for timeseries queries that return a single metric
func extractValueFromRecord(record map[string]any) string {
for _, item := range record {
if valuesArr, ok := toFloatArray(item); ok {
return fmt.Sprintf("%f", valuesArr[len(valuesArr)-1])
}
}
return ""
}

// extractValuesFromRecord extracts all values of a record.
// This is intended for timeseries queries that return multiple values for a single metric, i.e. the individual
// data points of a time series
func extractValuesFromRecord(record map[string]any) []string {
for _, item := range record {
if valuesArr, ok := toFloatArray(item); ok {
valuesStrArr := make([]string, len(valuesArr))
for index, val := range valuesArr {
valuesStrArr[index] = fmt.Sprintf("%f", val)
}
return valuesStrArr
}
}
return []string{}
}

func toFloatArray(obj any) ([]float64, bool) {
valuesArr, ok := obj.([]any)
if !ok {
return nil, false
}
res := make([]float64, len(valuesArr))
for index, val := range valuesArr {
if floatVal, ok := val.(float64); ok {
res[index] = floatVal
} else if intVal, ok := val.(int); ok {
res[index] = float64(intVal)
} else {
return nil, false
}
}
return res, true
}
Loading

0 comments on commit 3d7f737

Please sign in to comment.