Skip to content

Commit

Permalink
Merge pull request #14 from szkiba/13-adapt-to-protocol-changes
Browse files Browse the repository at this point in the history
adapt to protocol changes
  • Loading branch information
szkiba authored Sep 16, 2024
2 parents 68a4470 + 97e2de2 commit 915a7c5
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 42 deletions.
2 changes: 1 addition & 1 deletion internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func RootCmd() *cobra.Command {
Long: rootHelp,
Version: version,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
endpoint, err := url.JoinPath(baseURL, "/events")
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func runCmd() *cobra.Command {
Short: "k6 test runner and terminal-based metrics dashboard viewer",
Long: runHelp,
Version: version,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, args []string) error {
return runRun(args)
},

Expand Down
1 change: 1 addition & 0 deletions internal/digest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
EventTypeCumulative // EventTypeCumulative mean "cumulative" SSE event.
EventTypeStart // EventTypeStart mean "start" SSE event.
EventTypeStop // EventTypeStop mean "stop" SSE event.
EventTypeThreshold // EventTypeThreshold mean "threshold" SSE event.
EventTypeConnect // EventTypeConnect mean SSE channel connected.
EventTypeDisconnect // EventTypeDisconnect mean SSE channel disconnected.
)
Expand Down
28 changes: 16 additions & 12 deletions internal/digest/eventtype_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

195 changes: 168 additions & 27 deletions internal/stream/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@ package stream

import (
"encoding/json"
"errors"
"fmt"
"sort"

"github.com/r3labs/sse/v2"
"github.com/szkiba/xk6-top/internal/digest"
)

func parse(msg *sse.Event) (*digest.Event, error) {
type parser struct {
metrics digest.Metrics
names []string
}

func newParser() *parser {
return &parser{metrics: make(digest.Metrics)}
}

func (p *parser) parse(msg *sse.Event) (*digest.Event, error) {
var (
etype digest.EventType
edata interface{}
Expand All @@ -18,56 +30,185 @@ func parse(msg *sse.Event) (*digest.Event, error) {
return nil, err
}

edata, err = unmarshalData(etype, msg.Data)
edata, err = p.unmarshalData(etype, msg.Data)
if err != nil {
return nil, err
}

return &digest.Event{Type: etype, Data: edata}, nil
}

func unmarshalData(etype digest.EventType, data []byte) (interface{}, error) {
func (p *parser) unmarshalData(etype digest.EventType, data []byte) (interface{}, error) {
switch etype {
case digest.EventTypeMetric:
target := make(digest.Metrics)
return p.parseMetric(data)

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
case digest.EventTypeParam:
return p.parseParam(data)

case digest.EventTypeConfig:
return p.parseConfig(data)

case digest.EventTypeStart,
digest.EventTypeStop,
digest.EventTypeSnapshot,
digest.EventTypeCumulative:
if len(data) > 0 && data[0] == '{' {
return p.parseAggregatesLegacy(data)
}

return target, nil
return p.parseAggregates(data)

case digest.EventTypeParam:
target := new(digest.ParamData)
default:
return nil, nil //nolint:nilnil
}
}

if err := json.Unmarshal(data, target); err != nil {
return nil, err
}
func (p *parser) parseMetric(data []byte) (interface{}, error) {
target := make(digest.Metrics)

return target, nil
if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}

case digest.EventTypeConfig:
target := make(digest.ConfigData)
for k, v := range target {
v.Name = k

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}
p.metrics[k] = v
}

return target, nil
names := make([]string, 0, len(p.metrics))

case digest.EventTypeStart,
digest.EventTypeStop,
digest.EventTypeSnapshot,
digest.EventTypeCumulative:
target := make(digest.Aggregates)
for name := range p.metrics {
names = append(names, name)
}

sort.Strings(names)

p.names = names

if err := json.Unmarshal(data, &target); err != nil {
return target, nil
}

func (p *parser) parseParam(data []byte) (interface{}, error) {
target := new(digest.ParamData)

if err := json.Unmarshal(data, target); err != nil {
return nil, err
}

return target, nil
}

func (p *parser) parseConfig(data []byte) (interface{}, error) {
target := make(digest.ConfigData)

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}

return target, nil
}

func (p *parser) parseAggregatesLegacy(data []byte) (interface{}, error) {
target := make(digest.Aggregates)

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}

return target, nil
}

func (p *parser) parseAggregates(data []byte) (interface{}, error) {
var samples [][]float64

if err := json.Unmarshal(data, &samples); err != nil {
return nil, err
}

target := make(digest.Aggregates)

for metricIdx := range samples {
metric, err := p.getMetric(metricIdx)
if err != nil {
return nil, err
}

agg, err := p.parseAggregate(samples[metricIdx], metric.Type)
if err != nil {
return nil, err
}

return target, nil
target[metric.Name] = agg
}

return target, nil
}

func (p *parser) getMetric(idx int) (*digest.Metric, error) {
if idx >= len(p.names) {
return nil, fmt.Errorf("%w: metric index out of range %d", errData, idx)
}

name := p.names[idx]

metric, found := p.metrics[name]
if !found {
return nil, fmt.Errorf("%w: unknown metric name %s", errData, name)
}

return metric, nil
}

func (p *parser) parseAggregate(data []float64, mt digest.MetricType) (digest.Aggregate, error) {
names := aggregateNames(mt)
if len(names) == 0 {
return nil, fmt.Errorf("%w: no metric names for type %s", errData, mt.String())
}

if len(data) != len(names) {
return nil, fmt.Errorf(
"%w: metric definition mismatch %d - %d - %s",
errData,
len(data),
len(names),
names,
)

// return nil, fmt.Errorf("%w: metric definition mismatch %s", errData, mt.String())
}

agg := make(digest.Aggregate, len(names))

for idx := range names {
agg[names[idx]] = data[idx]
}

return agg, nil
}

func aggregateNames(mtype digest.MetricType) []string {
switch mtype {
case digest.MetricTypeGauge:
return gaugeAggregateNames
case digest.MetricTypeRate:
return rateAggregateNames
case digest.MetricTypeCounter:
return counterAggregateNames
case digest.MetricTypeTrend:
return trendAggregateNames
default:
return nil, nil //nolint:nilnil
return nil
}
}

//nolint:gochecknoglobals
var (
gaugeAggregateNames = []string{"value"}
rateAggregateNames = []string{"rate"}
counterAggregateNames = []string{"count", "rate"}
trendAggregateNames = []string{"avg", "max", "med", "min", "p(90)", "p(95)", "p(99)"}
)

var errData = errors.New("invalid data")
4 changes: 3 additions & 1 deletion internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ func Subscribe(ctx context.Context, url string, sub chan tea.Msg) tea.Cmd {
sub <- &digest.Event{Type: digest.EventTypeDisconnect}
})

parser := newParser()

return client.SubscribeRawWithContext(ctx, func(msg *sse.Event) {
event, perr := parse(msg)
event, perr := parser.parse(msg)
if perr != nil {
sub <- perr

Expand Down
6 changes: 6 additions & 0 deletions releases/v0.2.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
xk6-top `v0.2.1` is here 🎉!

`v0.2.0` is a maintenance release, it does not contain new features.
The purpose of the release is to adapt to protocol changes.

From xk6-dashboard v0.7.3, the SSE protocol aggregate events have been optimized. This is an incompatible change that is addressed in this release. Older xk6-top versions do not work with newer xk6-dashboard (and k6) versions.

0 comments on commit 915a7c5

Please sign in to comment.