Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up JSON output #1114

Merged
merged 4 commits into from
Sep 2, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 75 additions & 45 deletions stats/json/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package json

import (
"compress/gzip"
"context"
"encoding/json"
"io"
"os"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/afero"
Expand All @@ -34,20 +37,18 @@ import (
)

type Collector struct {
outfile io.WriteCloser
closeFn func() error
fname string
seenMetrics []string
}

// Verify that Collector implements lib.Collector
var _ lib.Collector = &Collector{}
encoder *json.Encoder

// Similar to ioutil.NopCloser, but for writers
type nopCloser struct {
io.Writer
buffer []stats.Sample
bufferLock sync.Mutex
na-- marked this conversation as resolved.
Show resolved Hide resolved
}

func (nopCloser) Close() error { return nil }
// Verify that Collector implements lib.Collector
var _ lib.Collector = &Collector{}

func (c *Collector) HasSeenMetric(str string) bool {
for _, n := range c.seenMetrics {
na-- marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -59,21 +60,35 @@ func (c *Collector) HasSeenMetric(str string) bool {
}

func New(fs afero.Fs, fname string) (*Collector, error) {
var c = &Collector{
fname: fname,
}
if fname == "" || fname == "-" {
return &Collector{
outfile: nopCloser{os.Stdout},
fname: "-",
}, nil
c.encoder = json.NewEncoder(os.Stdout)
c.closeFn = func() error {
return nil
}
return c, nil
}

logfile, err := fs.Create(fname)
logfile, err := fs.Create(c.fname)
if err != nil {
return nil, err
}
return &Collector{
outfile: logfile,
fname: fname,
}, nil

if strings.HasSuffix(c.fname, ".gz") {
var outfile = gzip.NewWriter(logfile)

c.closeFn = func() error {
_ = outfile.Close()
return logfile.Close()
}
c.encoder = json.NewEncoder(outfile)
} else {
c.closeFn = logfile.Close
c.encoder = json.NewEncoder(logfile)
}

return c, nil
}

func (c *Collector) Init() error {
Expand All @@ -83,9 +98,20 @@ func (c *Collector) Init() error {
func (c *Collector) SetRunStatus(status lib.RunStatus) {}

func (c *Collector) Run(ctx context.Context) {
logrus.WithField("filename", c.fname).Debug("JSON: Writing JSON metrics")
<-ctx.Done()
_ = c.outfile.Close()
logrus.Debug("InfluxDB: Running!")
na-- marked this conversation as resolved.
Show resolved Hide resolved
ticker := time.NewTicker(time.Millisecond * 100)
defer func() {
_ = c.closeFn()
}()
for {
select {
case <-ticker.C:
na-- marked this conversation as resolved.
Show resolved Hide resolved
c.commit()
case <-ctx.Done():
c.commit()
return
}
}
}

func (c *Collector) HandleMetric(m *stats.Metric) {
Expand All @@ -94,44 +120,48 @@ func (c *Collector) HandleMetric(m *stats.Metric) {
}

c.seenMetrics = append(c.seenMetrics, m.Name)
env := WrapMetric(m)
row, err := json.Marshal(env)
err := c.encoder.Encode(WrapMetric(m))

if env == nil || err != nil {
logrus.WithField("filename", c.fname).Warning(
if err != nil {
logrus.WithField("filename", c.fname).WithField("error", err).Warning(
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
"JSON: Envelope is nil or Metric couldn't be marshalled to JSON")
return
}

row = append(row, '\n')
_, err = c.outfile.Write(row)
na-- marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logrus.WithField("filename", c.fname).Error("JSON: Error writing to file")
}
}

func (c *Collector) Collect(scs []stats.SampleContainer) {
c.bufferLock.Lock()
defer c.bufferLock.Unlock()
for _, sc := range scs {
c.buffer = append(c.buffer, sc.GetSamples()...)
}
}

func (c *Collector) commit() {
c.bufferLock.Lock()
samples := c.buffer
c.buffer = nil
c.bufferLock.Unlock()
logrus.WithField("filename", c.fname).Debug("JSON: Writing JSON metrics")
var start = time.Now()
var count int
for _, sc := range samples {
var samples = sc.GetSamples()
count += len(samples)
for _, sample := range sc.GetSamples() {
sample := sample
c.HandleMetric(sample.Metric)

env := WrapSample(&sample)
row, err := json.Marshal(env)

if err != nil || env == nil {
// Skip metric if it can't be made into JSON or envelope is null.
logrus.WithField("filename", c.fname).Warning(
"JSON: Envelope is nil or Sample couldn't be marshalled to JSON")
continue
}
row = append(row, '\n')
_, err = c.outfile.Write(row)
err := c.encoder.Encode(WrapSample(&sample))
if err != nil {
logrus.WithField("filename", c.fname).Error("JSON: Error writing to file")
// Skip metric if it can't be made into JSON or envelope is null.
logrus.WithField("filename", c.fname).WithField("error", err).Warning(
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
"JSON: Sample couldn't be marshalled to JSON")
continue
}
}
}
logrus.WithField("filename", c.fname).WithField("t", time.Now().Sub(start)).
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
WithField("count", count).Debug("JSON: Wrote JSON metrics")
}

func (c *Collector) Link() string {
Expand Down