Skip to content

Commit

Permalink
[tools] Add benchmarking support to read_data_files (#2986)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Dec 7, 2020
1 parent ef87f84 commit 75ce6c3
Showing 1 changed file with 74 additions and 17 deletions.
91 changes: 74 additions & 17 deletions src/cmd/tools/read_data_files/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,35 @@ import (
"go.uber.org/zap"
)

const snapshotType = "snapshot"
const flushType = "flush"
const (
snapshotType = "snapshot"
flushType = "flush"
)

type benchmarkMode uint8

const (
// benchmarkNone prints the data read to the standard output and does not measure performance.
benchmarkNone benchmarkMode = iota

// benchmarkSeries benchmarks time series read performance (skipping datapoint decoding).
benchmarkSeries

// benchmarkDatapoints benchmarks series read, including datapoint decoding.
benchmarkDatapoints
)

func main() {
var (
optPathPrefix = getopt.StringLong("path-prefix", 'p', "", "Path prefix [e.g. /var/lib/m3db]")
optNamespace = getopt.StringLong("namespace", 'n', "", "Namespace [e.g. metrics]")
optNamespace = getopt.StringLong("namespace", 'n', "default", "Namespace [e.g. metrics]")
optShard = getopt.Uint32Long("shard", 's', 0, "Shard [expected format uint32]")
optBlockstart = getopt.Int64Long("block-start", 'b', 0, "Block Start Time [in nsec]")
volume = getopt.Int64Long("volume", 'v', 0, "Volume number")
fileSetTypeArg = getopt.StringLong("fileset-type", 't', flushType, fmt.Sprintf("%s|%s", flushType, snapshotType))
idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)")
benchmark = getopt.StringLong(
"benchmark", 'B', "", "benchmark mode (optional), [series/datapoints]")
)
getopt.Parse()

Expand Down Expand Up @@ -82,12 +99,30 @@ func main() {
log.Fatalf("unknown fileset type: %s", *fileSetTypeArg)
}

var benchMode benchmarkMode
switch *benchmark {
case "":
case "series":
benchMode = benchmarkSeries
case "datapoints":
benchMode = benchmarkDatapoints
default:
log.Fatalf("unknown benchmark type: %s", *benchmark)
}

bytesPool := tools.NewCheckedBytesPool()
bytesPool.Init()

encodingOpts := encoding.NewOptions().SetBytesPool(bytesPool)

fsOpts := fs.NewOptions().SetFilePathPrefix(*optPathPrefix)

var (
seriesCount = 0
datapointCount = 0
start = time.Now()
)

reader, err := fs.NewReader(bytesPool, fsOpts)
if err != nil {
log.Fatalf("could not create new reader: %v", err)
Expand Down Expand Up @@ -121,23 +156,45 @@ func main() {
continue
}

data.IncRef()
iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
// Use fmt package so it goes to stdout instead of stderr
fmt.Printf("{id: %s, dp: %+v", id.String(), dp)
if len(annotation) > 0 {
fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation))
if benchMode != benchmarkSeries {
data.IncRef()

iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
if benchMode == benchmarkNone {
// Use fmt package so it goes to stdout instead of stderr
fmt.Printf("{id: %s, dp: %+v", id.String(), dp)
if len(annotation) > 0 {
fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation))
}
fmt.Println("}")
}
datapointCount++
}
fmt.Println("}")
}
if err := iter.Err(); err != nil {
log.Fatalf("unable to iterate original data: %v", err)
if err := iter.Err(); err != nil {
log.Fatalf("unable to iterate original data: %v", err)
}
iter.Close()

data.DecRef()
}
iter.Close()

data.DecRef()
data.Finalize()
seriesCount++
}

if benchMode != benchmarkNone {
runTime := time.Since(start)
fmt.Printf("Running time: %s\n", runTime)
fmt.Printf("\n%d series read\n", seriesCount)
if runTime > 0 {
fmt.Printf("(%.2f series/second)\n", float64(seriesCount)/runTime.Seconds())
}

if benchMode == benchmarkDatapoints {
fmt.Printf("\n%d datapoints decoded\n", datapointCount)
fmt.Printf("(%.2f datapoints/second)\n", float64(datapointCount)/runTime.Seconds())
}
}
}

0 comments on commit 75ce6c3

Please sign in to comment.