Skip to content

Commit

Permalink
[read_index_segments] Always validate index segment checksums before …
Browse files Browse the repository at this point in the history
…reading/validating contents (#2835)
  • Loading branch information
robskillington authored Nov 4, 2020
1 parent 5fdf1c2 commit 158ab10
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 16 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ TOOLS := \
read_data_files \
read_index_files \
read_index_segments \
query_index_segments \
clone_fileset \
dtest \
verify_data_files \
Expand Down
256 changes: 256 additions & 0 deletions src/cmd/tools/query_index_segments/main/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package main

import (
"errors"
golog "log"
"math"
"os"
"runtime"
"sync"
"time"

"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/search/executor"
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/ident"
xsync "github.com/m3db/m3/src/x/sync"

"github.com/pborman/getopt"
"github.com/prometheus/prometheus/pkg/labels"
"go.uber.org/zap"
)

var (
halfCPUs = int(math.Max(float64(runtime.NumCPU()/2), 1))
endlineBytes = []byte("\n")
)

func main() {

var (
optPathPrefix = getopt.StringLong("path-prefix", 'p', "/var/lib/m3db", "Path prefix [e.g. /var/lib/m3db]")
optNamespace = getopt.StringLong("namespace", 'n', "", "Namespace to query")
optQuery = getopt.StringLong("query", 'q', "", "Query to issue to match time series (PromQL selector)")
optConcurrency = getopt.IntLong("concurrency", 'c', halfCPUs, "Query concurrency")
optValidate = true
)
getopt.BoolVarLong(&optValidate, "validate", 'v', "Validate index segments before querying")
getopt.Parse()

logConfig := zap.NewDevelopmentConfig()
log, err := logConfig.Build()
if err != nil {
golog.Fatalf("unable to create logger: %+v", err)
}

if *optNamespace == "" || *optQuery == "" {
getopt.Usage()
os.Exit(1)
}

run(runOptions{
filePathPrefix: *optPathPrefix,
namespace: *optNamespace,
query: *optQuery,
validate: optValidate,
concurrency: *optConcurrency,
log: log,
})

}

type runOptions struct {
filePathPrefix string
namespace string
query string
validate bool
concurrency int
log *zap.Logger
}

func run(opts runOptions) {
log := opts.log

parseOpts := promql.NewParseOptions()
parse := parseOpts.MetricSelectorFn()

matchers, err := parse(opts.query)
if err != nil {
log.Fatal("could not create matchers", zap.Error(err))
}

labelMatchers, err := toLabelMatchers(matchers)
if err != nil {
log.Fatal("could not create label matchers", zap.Error(err))
}

query, err := storage.PromReadQueryToM3(&prompb.Query{
Matchers: labelMatchers,
StartTimestampMs: 0,
EndTimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
})
if err != nil {
log.Fatal("could not create M3 fetch query", zap.Error(err))
}

indexQuery, err := storage.FetchQueryToM3Query(query, storage.NewFetchOptions())
if err != nil {
log.Fatal("could not create M3 index query", zap.Error(err))
}

fsOpts := fs.NewOptions().
SetFilePathPrefix(opts.filePathPrefix)

if opts.validate {
// Validate checksums before reading and/or validating contents if set.
fsOpts = fsOpts.SetIndexReaderAutovalidateIndexSegments(true)
}

var (
nsID = ident.StringID(opts.namespace)
infoFiles = fs.ReadIndexInfoFiles(fsOpts.FilePathPrefix(), nsID,
fsOpts.InfoReaderBufferSize())
results = make(map[string]struct{})
resultsLock sync.Mutex
wg sync.WaitGroup
)

log.Info("starting query",
zap.Int("concurrency", opts.concurrency),
zap.Bool("validateSegments", opts.validate))

workers := xsync.NewWorkerPool(opts.concurrency)
workers.Init()

for _, infoFile := range infoFiles {
if err := infoFile.Err.Error(); err != nil {
log.Error("unable to read index info file",
zap.Stringer("namespace", nsID),
zap.Error(err),
zap.String("filepath", infoFile.Err.Filepath()),
)
continue
}

readResult, err := fs.ReadIndexSegments(fs.ReadIndexSegmentsOptions{
ReaderOptions: fs.IndexReaderOpenOptions{
Identifier: infoFile.ID,
FileSetType: persist.FileSetFlushType,
},
FilesystemOptions: fsOpts,
})
if err != nil {
log.Fatal("unable to read segments from index fileset", zap.Error(err))
return
}

wg.Add(1)
workers.Go(func() {
defer wg.Done()

var readers []index.Reader
for _, seg := range readResult.Segments {
reader, err := seg.Reader()
if err != nil {
log.Fatal("segment reader error", zap.Error(err))
}

readers = append(readers, reader)
}

executor := executor.NewExecutor(readers)

iter, err := executor.Execute(indexQuery.Query.SearchQuery())
if err != nil {
log.Fatal("search execute error", zap.Error(err))
}

fields := make(map[string]string)
for iter.Next() {
d := iter.Current()

key := string(d.ID)

resultsLock.Lock()
_, ok := results[key]
if !ok {
results[key] = struct{}{}
}
resultsLock.Unlock()

if ok {
continue // Already printed.
}

for k := range fields {
delete(fields, k)
}
for _, field := range d.Fields {
fields[string(field.Name)] = string(field.Value)
}

log.Info("matched document",
zap.String("id", key),
zap.Any("fields", fields))
}

if err := iter.Err(); err != nil {
log.Fatal("iterate err", zap.Error(err))
}
if err := iter.Close(); err != nil {
log.Fatal("iterate close err", zap.Error(err))
}
})
}

wg.Wait()
}

func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType prompb.LabelMatcher_Type
switch m.Type {
case labels.MatchEqual:
mType = prompb.LabelMatcher_EQ
case labels.MatchNotEqual:
mType = prompb.LabelMatcher_NEQ
case labels.MatchRegexp:
mType = prompb.LabelMatcher_RE
case labels.MatchNotRegexp:
mType = prompb.LabelMatcher_NRE
default:
return nil, errors.New("invalid matcher type")
}
pbMatchers = append(pbMatchers, &prompb.LabelMatcher{
Type: mType,
Name: []byte(m.Name),
Value: []byte(m.Value),
})
}
return pbMatchers, nil
}
16 changes: 13 additions & 3 deletions src/cmd/tools/read_index_segments/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func run(opts runOptions) {
log := opts.log

fsOpts := fs.NewOptions().
SetFilePathPrefix(opts.filePathPrefix)
SetFilePathPrefix(opts.filePathPrefix).
// Always validate checksums before reading and/or validating contents
// regardless of whether this is a validation run or just reading
// the raw files.
SetIndexReaderAutovalidateIndexSegments(true)

indexDirPath := fs.IndexDataDirPath(opts.filePathPrefix)

Expand Down Expand Up @@ -202,7 +206,7 @@ func readBlockSegments(

log.Info("reading block segments")

segments, err := fs.ReadIndexSegments(fs.ReadIndexSegmentsOptions{
readResult, err := fs.ReadIndexSegments(fs.ReadIndexSegmentsOptions{
ReaderOptions: fs.IndexReaderOpenOptions{
Identifier: infoFile.ID,
FileSetType: persist.FileSetFlushType,
Expand All @@ -214,7 +218,13 @@ func readBlockSegments(
return
}

for i, seg := range segments {
if readResult.Validated {
log.Info("validated segments")
} else {
log.Error("expected to validate segments but did not validate")
}

for i, seg := range readResult.Segments {
jw := json.NewWriter(out)
jw.BeginObject()

Expand Down
14 changes: 12 additions & 2 deletions src/cmd/tools/verify_data_files/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func run(opts runOptions) {

log.Info("verifying file sets", zap.Int("numFileSets", len(fileSetFiles)))
for _, fileSet := range fileSetFiles {
if !fileSet.HasCompleteCheckpointFile() {
continue // Don't validate file sets without checkpoint file.
}

log.Info("verifying file set file", zap.Any("fileSet", fileSet))
if err := verifyFileSet(verifyFileSetOptions{
filePathPrefix: filePathPrefix,
Expand Down Expand Up @@ -301,10 +305,16 @@ func readEntry(
data checked.Bytes,
checksum uint32,
) (readEntryResult, error) {
if !utf8.Valid(id.Bytes()) {
idValue := id.Bytes()
if len(idValue) == 0 {
return readEntryResult{invalidID: true},
fmt.Errorf("invalid id: err=%s, as_string=%s, as_hex=%x",
"empty", idValue, idValue)
}
if !utf8.Valid(idValue) {
return readEntryResult{invalidID: true},
fmt.Errorf("invalid id: err=%s, as_string=%s, as_hex=%x",
"non-utf8", id.Bytes(), id.Bytes())
"non-utf8", idValue, idValue)
}

for tags.Next() {
Expand Down
Loading

0 comments on commit 158ab10

Please sign in to comment.