Skip to content

Commit

Permalink
Not allowing invalid datapoints to ruin entire batches
Browse files Browse the repository at this point in the history
* Not allowing invalid datapoints to ruin entire batches

1. opentsdb.DataPoint.IsValid is largely unchanged. Determines if datapoint is valid to submit as is.
2. If collect gets a non-cleanable datapoint it silently drops it.
3. When you `Add` a datapoint, if the tags cannot be cleaned, it will log a message with (hopefully) a helpful line number, and drop it.

* Fix linux
  • Loading branch information
Craig Peterson authored Jun 16, 2016
1 parent bd32068 commit 7396a3a
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 30 deletions.
32 changes: 25 additions & 7 deletions cmd/scollector/collectors/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"strings"
"sync"
Expand All @@ -17,6 +19,7 @@ import (
"bosun.org/cmd/scollector/conf"
"bosun.org/metadata"
"bosun.org/opentsdb"
"bosun.org/slog"
"bosun.org/util"
)

Expand Down Expand Up @@ -236,14 +239,23 @@ func AddTS(md *opentsdb.MultiDataPoint, name string, ts int64, value interface{}
if skipMetric(name) {
return
}
if b, ok := value.(bool); ok {
if b {
value = 1
} else {
value = 0

tags := t.Copy()
// if tags are not cleanable, log a message and skip it
if err := tags.Clean(); err != nil {
line := ""
//attempt to log where Add was called from
if _, filename, l, ok := runtime.Caller(1); ok {
if filepath.Base(filename) == "collectors.go" {
_, filename, l, ok = runtime.Caller(2)
}
if ok {
line = fmt.Sprintf("%s:%d", filepath.Base(filename), l)
}
}
slog.Errorf("Invalid tagset discovered: %s. Skipping datapoint. Added from: %s", tags.String(), line)
return
}
tags := t.Copy()
if host, present := tags["host"]; !present {
tags["host"] = util.Hostname
} else if host == "" {
Expand All @@ -258,8 +270,14 @@ func AddTS(md *opentsdb.MultiDataPoint, name string, ts int64, value interface{}
if desc != "" {
metadata.AddMeta(name, tags, "desc", desc, false)
}

tags = AddTags.Copy().Merge(tags)
if b, ok := value.(bool); ok {
if b {
value = 1
} else {
value = 0
}
}
d := opentsdb.DataPoint{
Metric: name,
Timestamp: ts,
Expand Down
14 changes: 13 additions & 1 deletion cmd/scollector/collectors/collectors_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package collectors

import "testing"
import (
"bosun.org/opentsdb"
"testing"
)

func TestIsDigit(t *testing.T) {
if IsDigit("1a3") {
Expand All @@ -10,3 +13,12 @@ func TestIsDigit(t *testing.T) {
t.Error("029: expected true")
}
}

func TestAddTS_Invalid(t *testing.T) {
mdp := &opentsdb.MultiDataPoint{}
ts := opentsdb.TagSet{"srv": "%%%"}
Add(mdp, "aaaa", 42, ts, "", "", "") //don't have a good way to tesst this automatically, but I look for a log message with this line number in it.
if len(*mdp) != 0 {
t.Fatal("Shouldn't have added invalid tags.")
}
}
2 changes: 1 addition & 1 deletion cmd/scollector/collectors/processes_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func NewWatchedProc(params conf.ProcessParams) (*WatchedProc, error) {
if params.Name == "" {
params.Name = params.Command
}
if !opentsdb.ValidTag(params.Name) {
if !opentsdb.ValidTSDBString(params.Name) {
return nil, fmt.Errorf("bad process name: %v", params.Name)
}
return &WatchedProc{
Expand Down
2 changes: 1 addition & 1 deletion cmd/scollector/collectors/program.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func parseTcollectorValue(line string) (*opentsdb.DataPoint, error) {
if err != nil {
return nil, fmt.Errorf("bad value: %s", sp[2])
}
if !opentsdb.ValidTag(sp[0]) {
if !opentsdb.ValidTSDBString(sp[0]) {
return nil, fmt.Errorf("bad metric: %s", sp[0])
}
dp := opentsdb.DataPoint{
Expand Down
9 changes: 6 additions & 3 deletions collect/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"io/ioutil"
"net/http"
"sync/atomic"
"time"

"bosun.org/metadata"
Expand All @@ -15,12 +16,14 @@ import (

func queuer() {
for dp := range tchan {
if err := dp.Clean(); err != nil {
atomic.AddInt64(&dropped, 1)
continue // if anything gets this far that can't be made valid, just drop it silently.
}
qlock.Lock()
for {
if len(queue) > MaxQueueLen {
slock.Lock()
dropped++
slock.Unlock()
atomic.AddInt64(&dropped, 1)
break
}
queue = append(queue, dp)
Expand Down
28 changes: 13 additions & 15 deletions opentsdb/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type DataPoint struct {

// MarshalJSON verifies d is valid and converts it to JSON.
func (d *DataPoint) MarshalJSON() ([]byte, error) {
if err := d.clean(); err != nil {
if err := d.Clean(); err != nil {
return nil, err
}
return json.Marshal(struct {
Expand All @@ -89,7 +89,7 @@ func (d *DataPoint) MarshalJSON() ([]byte, error) {
// Valid returns whether d contains valid data (populated fields, valid tags)
// for submission to OpenTSDB.
func (d *DataPoint) Valid() bool {
if d.Metric == "" || d.Timestamp == 0 || d.Value == nil || !d.Tags.Valid() {
if d.Metric == "" || !ValidTSDBString(d.Metric) || d.Timestamp == 0 || d.Value == nil || !d.Tags.Valid() {
return false
}
if _, err := strconv.ParseFloat(fmt.Sprint(d.Value), 64); err != nil {
Expand Down Expand Up @@ -242,7 +242,7 @@ func (t TagSet) Valid() bool {
return err == nil
}

func (d *DataPoint) clean() error {
func (d *DataPoint) Clean() error {
if err := d.Tags.Clean(); err != nil {
return fmt.Errorf("cleaning tags for metric %s: %s", d.Metric, err)
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func Replace(s, replacement string) (string, error) {
replaced := false
for len(s) > 0 {
r, size := utf8.DecodeRuneInString(s)
if unicode.IsLetter(r) || unicode.IsDigit(r) || r == '-' || r == '_' || r == '.' || r == '/' {
if isRuneValid(r) {
c += string(r)
replaced = false
} else if !replaced {
Expand All @@ -326,6 +326,10 @@ func Replace(s, replacement string) (string, error) {
return c, nil
}

func isRuneValid(r rune) bool {
return unicode.IsLetter(r) || unicode.IsDigit(r) || r == '-' || r == '_' || r == '.' || r == '/'
}

// MustReplace is like Replace, but returns an empty string on error.
func MustReplace(s, replacement string) string {
r, err := Replace(s, replacement)
Expand Down Expand Up @@ -574,15 +578,15 @@ func ParseTags(t string) (TagSet, error) {
if i > 0 {
continue
}
if !ValidTag(sp[i]) {
if !ValidTSDBString(sp[i]) {
err = fmt.Errorf("invalid character in %s", sp[i])
}
}
for _, s := range strings.Split(sp[1], "|") {
if s == "*" {
continue
}
if !ValidTag(s) {
if !ValidTSDBString(s) {
err = fmt.Errorf("invalid character in %s", sp[1])
}
}
Expand All @@ -594,19 +598,13 @@ func ParseTags(t string) (TagSet, error) {
return ts, err
}

// ValidTag returns true if s is a valid metric or tag.
func ValidTag(s string) bool {
// ValidTSDBString returns true if s is a valid metric or tag.
func ValidTSDBString(s string) bool {
if s == "" {
return false
}
for _, c := range s {
switch {
case c >= 'a' && c <= 'z':
case c >= 'A' && c <= 'Z':
case c >= '0' && c <= '9':
case strings.ContainsAny(string(c), `-_./`):
case unicode.IsLetter(c):
default:
if !isRuneValid(c) {
return false
}
}
Expand Down
4 changes: 2 additions & 2 deletions opentsdb/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestQueryString(t *testing.T) {
}
}

func TestValidTag(t *testing.T) {
func TestValidTSDBString(t *testing.T) {
tests := map[string]bool{
"abcXYZ012_./-": true,

Expand All @@ -297,7 +297,7 @@ func TestValidTag(t *testing.T) {
"a=b": false,
}
for s, v := range tests {
r := ValidTag(s)
r := ValidTSDBString(s)
if v != r {
t.Errorf("%v: got %v, expected %v", s, r, v)
}
Expand Down

0 comments on commit 7396a3a

Please sign in to comment.