Skip to content

Commit

Permalink
Use Data frames response format (#1099)
Browse files Browse the repository at this point in the history
* Use data frames for numeric data

* Use data frames for text data

* Use data frames for IT services

* fix multiple series

* Convert to the wide format if possible

* Fix table format for text data

* Add refId to the data frames

* Align time series from Zabbix API

* Fill gaps with nulls

* Fix moving average functions

* Option for disabling data alignment

* remove unused logging

* Add labels to data frames

* Detect units

* Set min and max for if percent unit used

* Use value mapping from Zabbix

* Rename unitConverter -> convertZabbixUnit

* More units

* Add missing points in front of each series

* Fix handling table data

* fix db connector data frames handling

* fix it services data frames handling

* Detect all known grafana units

* Chore: remove unused logging

* Fix problems format

* Debug logging: show original units

* Add global option for disabling data alignment

* Add tooltip for the disableDataAlignment feature

* Add note about query options

* Functions for aligning timeseries on the backend
  • Loading branch information
alexanderzobnin authored Dec 22, 2020
1 parent ad378a8 commit 8361817
Show file tree
Hide file tree
Showing 18 changed files with 700 additions and 91 deletions.
7 changes: 2 additions & 5 deletions docs/sources/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,9 @@ Read [how to configure](./direct_db_datasource) SQL data source in Grafana.

**MySQL**, **PostgreSQL**, **InfluxDB** are supported as sources of historical data for the plugin.

### Alerting
### Other

- **Enable alerting**: enable limited alerting support.
- **Add thresholds**: get thresholds info from zabbix triggers and add it to graphs.
For example, if you have trigger `{Zabbix server:system.cpu.util[,iowait].avg(5m)}>20`, threshold will be set to 20.
- **Min severity**: minimum trigger severity for showing alert info (OK/Problem).
- **Disable data alignment**: disable time series data alignment. This feature aligns points based on item update interval. For instance, if value collected once per minute, then timestamp of the each point will be set to the start of corresponding minute. This alignment required for proper work of the stacked graphs. If you don't need stacked graphs and want to get exactly the same timestamps as in Zabbix, then you can disable this feature. Also, data alignment can be toggled for each query individually, in the query options.

Then click _Add_ - datasource will be added and you can check connection using
_Test Connection_ button. This feature can help to find some mistakes like invalid user name
Expand Down
6 changes: 4 additions & 2 deletions docs/sources/configuration/provisioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ datasources:
alerting: true
addThresholds: false
alertingMinSeverity: 3
# Disable acknowledges for read-only users
disableReadOnlyUsersAck: true
# Direct DB Connection options
dbConnectionEnable: true
# Name of existing datasource for Direct DB Connection
dbConnectionDatasourceName: MySQL Zabbix
# Retention policy name (InfluxDB only) for fetching long-term stored data.
# Leave it blank if only default retention policy used.
dbConnectionRetentionPolicy: one_year
# Disable acknowledges for read-only users
disableReadOnlyUsersAck: true
# Disable time series data alignment
disableDataAlignment: false
version: 1
editable: false

Expand Down
49 changes: 49 additions & 0 deletions pkg/datasource/response_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datasource

import (
"fmt"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)

func convertHistory(history History, items Items) *data.Frame {
timeFileld := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFileld.Name = "time"
frame := data.NewFrame("History", timeFileld)

for _, item := range items {
field := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)
if len(item.Hosts) > 0 {
field.Name = fmt.Sprintf("%s: %s", item.Hosts[0].Name, item.ExpandItem())
} else {
field.Name = item.ExpandItem()
}
frame.Fields = append(frame.Fields, field)
}

for _, point := range history {
for columnIndex, field := range frame.Fields {
if columnIndex == 0 {
ts := time.Unix(point.Clock, point.NS)
field.Append(ts)
} else {
item := items[columnIndex-1]
if point.ItemID == item.ID {
value := point.Value
field.Append(&value)
} else {
field.Append(nil)
}
}
}
}

wideFrame, err := data.LongToWide(frame, &data.FillMissing{Mode: data.FillModeNull})
if err != nil {
backend.Logger.Debug("Error converting data frame to the wide format", "error", err)
return frame
}
return wideFrame
}
43 changes: 2 additions & 41 deletions pkg/datasource/zabbix.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context
return nil, err
}

return convertHistory(history, items), nil
frame := convertHistory(history, items)
return frame, nil
}

func (ds *ZabbixDatasourceInstance) getTrendValueType(query *QueryModel) string {
Expand Down Expand Up @@ -472,46 +473,6 @@ func (ds *ZabbixDatasourceInstance) isUseTrend(timeRange backend.TimeRange) bool
return false
}

func convertHistory(history History, items Items) *data.Frame {
timeFileld := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFileld.Name = "time"
frame := data.NewFrame("History", timeFileld)

for _, item := range items {
field := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)
if len(item.Hosts) > 0 {
field.Name = fmt.Sprintf("%s: %s", item.Hosts[0].Name, item.ExpandItem())
} else {
field.Name = item.ExpandItem()
}
frame.Fields = append(frame.Fields, field)
}

for _, point := range history {
for columnIndex, field := range frame.Fields {
if columnIndex == 0 {
ts := time.Unix(point.Clock, point.NS)
field.Append(ts)
} else {
item := items[columnIndex-1]
if point.ItemID == item.ID {
value := point.Value
field.Append(&value)
} else {
field.Append(nil)
}
}
}
}

// TODO: convert to wide format
wideFrame, err := data.LongToWide(frame, &data.FillMissing{Mode: data.FillModeNull})
if err == nil {
return wideFrame
}
return frame
}

func parseFilter(filter string) (*regexp.Regexp, error) {
regex := regexp.MustCompile(`^/(.+)/(.*)$`)
flagRE := regexp.MustCompile("[imsU]+")
Expand Down
18 changes: 18 additions & 0 deletions pkg/timeseries/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package timeseries

import "time"

type TimePoint struct {
Time time.Time
Value *float64
}

type TimeSeries []TimePoint

func NewTimeSeries() TimeSeries {
return make(TimeSeries, 0)
}

func (ts *TimeSeries) Len() int {
return len(*ts)
}
153 changes: 153 additions & 0 deletions pkg/timeseries/timeseries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package timeseries

import (
"errors"
"math"
"sort"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)

// Aligns point's time stamps according to provided interval.
func (ts TimeSeries) Align(interval time.Duration) TimeSeries {
if interval <= 0 || ts.Len() < 2 {
return ts
}

alignedTs := NewTimeSeries()
var frameTs = ts[0].GetTimeFrame(interval)
var pointFrameTs time.Time
var point TimePoint

for i := 1; i < ts.Len(); i++ {
point = ts[i]
pointFrameTs = point.GetTimeFrame(interval)

if pointFrameTs.After(frameTs) {
for frameTs.Before(pointFrameTs) {
alignedTs = append(alignedTs, TimePoint{Time: frameTs, Value: nil})
frameTs = frameTs.Add(interval)
}
}

alignedTs = append(alignedTs, TimePoint{Time: pointFrameTs, Value: point.Value})
frameTs = frameTs.Add(interval)
}

return alignedTs
}

// Detects interval between data points in milliseconds based on median delta between points.
func (ts TimeSeries) DetectInterval() time.Duration {
if ts.Len() < 2 {
return 0
}

deltas := make([]int, 0)
for i := 1; i < ts.Len(); i++ {
delta := ts[i].Time.Sub(ts[i-1].Time)
deltas = append(deltas, int(delta.Milliseconds()))
}
sort.Ints(deltas)
midIndex := int(math.Floor(float64(len(deltas)) * 0.5))
return time.Duration(deltas[midIndex]) * time.Millisecond
}

// Gets point timestamp rounded according to provided interval.
func (p *TimePoint) GetTimeFrame(interval time.Duration) time.Time {
return p.Time.Truncate(interval)
}

func alignDataPoints(frame *data.Frame, interval time.Duration) *data.Frame {
if interval <= 0 || frame.Rows() < 2 {
return frame
}

timeFieldIdx := getTimeFieldIndex(frame)
if timeFieldIdx < 0 {
return frame
}
var frameTs = getPointTimeFrame(getTimestampAt(frame, 0), interval)
var pointFrameTs *time.Time
var pointsInserted = 0

for i := 1; i < frame.Rows(); i++ {
pointFrameTs = getPointTimeFrame(getTimestampAt(frame, i), interval)
if pointFrameTs == nil || frameTs == nil {
continue
}

if pointFrameTs.After(*frameTs) {
for frameTs.Before(*pointFrameTs) {
insertAt := i + pointsInserted
err := insertNullPointAt(frame, *frameTs, insertAt)
if err != nil {
backend.Logger.Debug("Error inserting null point", "error", err)
}
*frameTs = frameTs.Add(interval)
pointsInserted++
}
}

setTimeAt(frame, *pointFrameTs, i+pointsInserted)
*frameTs = frameTs.Add(interval)
}

return frame
}

func getPointTimeFrame(ts *time.Time, interval time.Duration) *time.Time {
if ts == nil {
return nil
}
timeFrame := ts.Truncate(interval)
return &timeFrame
}

func getTimeFieldIndex(frame *data.Frame) int {
for i := 0; i < len(frame.Fields); i++ {
if frame.Fields[i].Type() == data.FieldTypeTime {
return i
}
}

return -1
}

func getTimestampAt(frame *data.Frame, index int) *time.Time {
timeFieldIdx := getTimeFieldIndex(frame)
if timeFieldIdx < 0 {
return nil
}

tsValue := frame.Fields[timeFieldIdx].At(index)
ts, ok := tsValue.(time.Time)
if !ok {
return nil
}

return &ts
}

func insertNullPointAt(frame *data.Frame, frameTs time.Time, index int) error {
for _, field := range frame.Fields {
if field.Type() == data.FieldTypeTime {
field.Insert(index, frameTs)
} else if field.Type().Nullable() {
field.Insert(index, nil)
} else {
return errors.New("field is not nullable")
}
}
return nil
}

func setTimeAt(frame *data.Frame, frameTs time.Time, index int) {
for _, field := range frame.Fields {
if field.Type() == data.FieldTypeTime {
field.Insert(index, frameTs)
}
}
}
13 changes: 12 additions & 1 deletion src/datasource-zabbix/components/ConfigEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const ConfigEditor = (props: Props) => {
trendsRange: '',
cacheTTL: '',
timeout: '',
disableDataAlignment: false,
...restJsonData,
},
});
Expand Down Expand Up @@ -209,10 +210,20 @@ export const ConfigEditor = (props: Props) => {
<h3 className="page-heading">Other</h3>
<Switch
label="Disable acknowledges for read-only users"
labelClass="width-20"
labelClass="width-16"
checked={options.jsonData.disableReadOnlyUsersAck}
onChange={jsonDataSwitchHandler('disableReadOnlyUsersAck', options, onOptionsChange)}
/>
<Switch
label="Disable data alignment"
labelClass="width-16"
checked={!!options.jsonData.disableDataAlignment}
onChange={jsonDataSwitchHandler('disableDataAlignment', options, onOptionsChange)}
tooltip="Data alignment feature aligns points based on item update interval.
For instance, if value collected once per minute, then timestamp of the each point will be set to the start of corresponding minute.
This alignment required for proper work of the stacked graphs.
If you don't need stacked graphs and want to get exactly the same timestamps as in Zabbix, then you can disable this feature."
/>
</div>
</>
);
Expand Down
Loading

0 comments on commit 8361817

Please sign in to comment.