Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically encode/decode data and query files which filenames ends in .gz #261

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -828,11 +828,8 @@ github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs=
github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc=
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.3+incompatible h1:uenXGGa8ESCQq+dbgtl916dmg6PSAz2cXov0uORQ9v8=
github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc h1:jUIKcSPO9MoMJBbEoyE/RJoE8vz7Mb8AjvifMMwSyvY=
Expand Down Expand Up @@ -902,6 +899,7 @@ github.com/timescale/promscale v0.0.0-20201006153045-6a66a36f5c84/go.mod h1:rkhy
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -1136,7 +1134,6 @@ golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200821140526-fda516888d29/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200908134130-d2e65c121b96 h1:gJciq3lOg0eS9fSZJcoHfv7q1BfC6cJfnmSSKL1yu3Q=
golang.org/x/sys v0.0.0-20200908134130-d2e65c121b96/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa h1:ZYxPR6aca/uhfRJyaOAtflSHjJYiktO7QnJC5ut7iY4=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
20 changes: 17 additions & 3 deletions internal/inputs/generator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"sort"

"github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/pkg/data"
"github.com/timescale/tsbs/pkg/data/serialize"
"github.com/timescale/tsbs/pkg/data/usecases"
Expand Down Expand Up @@ -36,6 +37,10 @@ type DataGenerator struct {
// bufOut represents the buffered writer that should actually be passed to
// any operations that write out data.
bufOut *bufio.Writer

// closerOut is a potential closer for bufOut. It should be closed
// when done writing as the underlying Writer can have buffered data too (e.g. gzip).
closerOut io.Closer
}

func (g *DataGenerator) init(config common.GeneratorConfig) error {
Expand All @@ -57,7 +62,7 @@ func (g *DataGenerator) init(config common.GeneratorConfig) error {
if g.Out == nil {
g.Out = os.Stdout
}
g.bufOut, err = getBufferedWriter(g.config.File, g.Out)
g.bufOut, g.closerOut, err = utils.GetBufferedWriter(g.config.File, g.Out)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,8 +106,17 @@ func (g *DataGenerator) CreateSimulator(config *common.DataGeneratorConfig) (com
return scfg.NewSimulator(g.config.LogInterval, g.config.Limit), nil
}

func (g *DataGenerator) close() {
if g.bufOut != nil {
g.bufOut.Flush()
}
if g.closerOut != nil {
g.closerOut.Close()
}
}

func (g *DataGenerator) runSimulator(sim common.Simulator, serializer serialize.PointSerializer, dgc *common.DataGeneratorConfig) error {
defer g.bufOut.Flush()
defer g.close()

currGroupID := uint(0)
point := data.NewPoint()
Expand Down Expand Up @@ -139,7 +153,7 @@ func (g *DataGenerator) getSerializer(sim common.Simulator, target targets.Imple
return target.Serializer(), nil
}

//TODO should be implemented in targets package
// TODO should be implemented in targets package
func (g *DataGenerator) writeHeader(headers *common.GeneratedDataHeaders) {
g.bufOut.WriteString("tags")

Expand Down
18 changes: 16 additions & 2 deletions internal/inputs/generator_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
errUnknownUseCaseFmt = "use case '%s' is undefined"
errCannotParseTimeFmt = "cannot parse time from string '%s': %v"
errBadUseFmt = "invalid use case specified: '%v'"
errUnknownFormatFmt = "unknown format: '%s'"
)

// DevopsGeneratorMaker creates a query generator for devops use case
Expand Down Expand Up @@ -64,6 +65,10 @@ type QueryGenerator struct {
// bufOut represents the buffered writer that should actually be passed to
// any operations that write out data.
bufOut *bufio.Writer

// closerOut is a potential closer for bufOut. It should be closed
// when done writing as the underlying Writer can have buffered data too (e.g. gzip).
closerOut io.Closer
}

// NewQueryGenerator returns a QueryGenerator that is set up to work with a given
Expand Down Expand Up @@ -132,7 +137,7 @@ func (g *QueryGenerator) init(conf common.GeneratorConfig) error {
if g.Out == nil {
g.Out = os.Stdout
}
g.bufOut, err = getBufferedWriter(g.conf.File, g.Out)
g.bufOut, g.closerOut, err = internalUtils.GetBufferedWriter(g.conf.File, g.Out)
if err != nil {
return err
}
Expand Down Expand Up @@ -201,11 +206,20 @@ func (g *QueryGenerator) getUseCaseGenerator(c *config.QueryGeneratorConfig) (qu
}
}

func (g *QueryGenerator) close() {
if g.bufOut != nil {
g.bufOut.Flush()
}
if g.closerOut != nil {
g.closerOut.Close()
}
}

func (g *QueryGenerator) runQueryGeneration(useGen queryUtils.QueryGenerator, filler queryUtils.QueryFiller, c *config.QueryGeneratorConfig) error {
stats := make(map[string]int64)
currentGroup := uint(0)
enc := gob.NewEncoder(g.bufOut)
defer g.bufOut.Flush()
defer g.close()

rand.Seed(g.conf.Seed)
//fmt.Println(g.config.Seed)
Expand Down
27 changes: 0 additions & 27 deletions internal/inputs/utils.go

This file was deleted.

42 changes: 42 additions & 0 deletions internal/utils/buffered_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package utils

import (
"bufio"
"compress/gzip"
"fmt"
"io"
"os"
"strings"
)

const (
defaultReadSize = 4 << 20 // 4 MB
)

// GetBufferedReader returns a buffered Reader to be used when reading data or queries.
// If a fileName is given, the input will be read from that file, otherwise it will be read from the fallback Reader.
// If the fileName has a .gz extension, the input will be gzipped.
func GetBufferedReader(fileName string, fallback io.Reader) (*bufio.Reader, error) {
file, err := fileReader(fileName, fallback)
if err != nil {
return nil, fmt.Errorf("cannot open file for read %s: %w", fileName, err)
}
return bufio.NewReaderSize(file, defaultReadSize), nil
}

// fileReader returns a reader for the given file name,
// handling gzipped files if the file extensions is gzip,
// or fallback if the file name is empty
func fileReader(fileName string, fallback io.Reader) (io.Reader, error) {
if len(fileName) == 0 {
return fallback, nil
}
file, err := os.Open(fileName)
if err != nil {
return nil, err
}
if !strings.HasSuffix(fileName, ".gz") {
return file, nil
}
return gzip.NewReader(file)
}
43 changes: 43 additions & 0 deletions internal/utils/buffered_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package utils

import (
"bufio"
"compress/gzip"
"fmt"
"io"
"os"
"strings"
)

const (
defaultWriteSize = 4 << 20 // 4 MB
)

// GetBufferedWriter returns a buffered Writer alongside a potential Closer (if using gzip) to be used when saving data or queries.
// If a fileName is given, the output will be written to that file, otherwise it will be written to the fallback Writer.
// If the fileName has a .gz extension, the output will be gzipped and return a closer that needs to be closed.
func GetBufferedWriter(fileName string, fallback io.Writer) (*bufio.Writer, io.Closer, error) {
file, closer, err := fileWriter(fileName, fallback)
if err != nil {
return nil, nil, fmt.Errorf("cannot open file for write %s: %w", fileName, err)
}
return bufio.NewWriterSize(file, defaultWriteSize), closer, nil
}

// fileWriter returns a Writer for the given file name,
// handling gzipped files if the file extensions is gzip,
// or fallback if the file name is empty.
func fileWriter(fileName string, fallback io.Writer) (io.Writer, io.Closer, error) {
if len(fileName) == 0 {
return fallback, nil, nil
}
file, err := os.Create(fileName)
if err != nil {
return nil, nil, err
}
if !strings.HasSuffix(fileName, ".gz") {
return file, file, nil
}
gzFile := gzip.NewWriter(file)
return gzFile, gzFile, nil
}
15 changes: 4 additions & 11 deletions load/buffered_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,17 @@ package load
import (
"bufio"
"os"
)

const (
defaultReadSize = 4 << 20 // 4 MB
"github.com/timescale/tsbs/internal/utils"
)

// GetBufferedReader returns the buffered Reader that should be used by the file loader
// if no file name is specified a buffer for STDIN is returned
func GetBufferedReader(fileName string) *bufio.Reader {
if len(fileName) == 0 {
// Read from STDIN
return bufio.NewReaderSize(os.Stdin, defaultReadSize)
}
// Read from specified file
file, err := os.Open(fileName)
buf, err := utils.GetBufferedReader(fileName, os.Stdin)
if err != nil {
fatal("cannot open file for read %s: %v", fileName, err)
fatal("%v", err)
return nil
}
return bufio.NewReaderSize(file, defaultReadSize)
return buf
}
17 changes: 5 additions & 12 deletions pkg/query/benchmarker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import (
"time"

"github.com/spf13/pflag"
"github.com/timescale/tsbs/internal/utils"
"golang.org/x/time/rate"
)

const (
labelAllQueries = "all queries"
labelColdQueries = "cold queries"
labelWarmQueries = "warm queries"

defaultReadSize = 4 << 20 // 4 MB
)

// BenchmarkRunnerConfig is the configuration of the benchmark runner.
Expand Down Expand Up @@ -119,16 +118,10 @@ type Processor interface {
// GetBufferedReader returns the buffered Reader that should be used by the loader
func (b *BenchmarkRunner) GetBufferedReader() *bufio.Reader {
if b.br == nil {
if len(b.FileName) > 0 {
// Read from specified file
file, err := os.Open(b.FileName)
if err != nil {
panic(fmt.Sprintf("cannot open file for read %s: %v", b.FileName, err))
}
b.br = bufio.NewReaderSize(file, defaultReadSize)
} else {
// Read from STDIN
b.br = bufio.NewReaderSize(os.Stdin, defaultReadSize)
var err error
b.br, err = utils.GetBufferedReader(b.FileName, os.Stdin)
if err != nil {
panic(err.Error())
}
}
return b.br
Expand Down