Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/tsdbrelay: enable version numbers, fix metadata, add error metrics #1859

Merged
merged 5 commits into from
Aug 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions cmd/bosun/docker/build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
FROM debian:wheezy

RUN apt-get update && apt-get install -y \
automake \
curl \
git \
make \
openjdk-7-jdk \
python \
--no-install-recommends \
&& rm -rf /var/lib/apt/lists/*
automake \
curl \
git \
make \
openjdk-7-jdk \
python \
--no-install-recommends \
&& rm -rf /var/lib/apt/lists/*

ENV TSDB /tsdb
RUN git clone --single-branch --depth 1 git://github.com/OpenTSDB/opentsdb.git $TSDB && \
cd $TSDB && bash ./build.sh
cd $TSDB && bash ./build.sh

ENV GOPATH /go
ENV HBASEVER 1.1.2
Expand All @@ -24,7 +24,7 @@ RUN mkdir -p /hbase \
| tar -xzC /hbase \
&& mv /hbase/hbase-$HBASEVER /hbase/hbase

RUN curl -SL https://storage.googleapis.com/golang/go1.6.2.linux-amd64.tar.gz \
RUN curl -SL https://storage.googleapis.com/golang/go1.7.linux-amd64.tar.gz \
| tar -xzC /usr/local

COPY bosun $GOPATH/src/bosun.org/
Expand All @@ -33,6 +33,6 @@ WORKDIR $GOPATH/src/bosun.org
ENV PATH $PATH:/usr/local/go/bin:$GOPATH/bin

RUN go run build/build.go \
&& bosun -version \
&& scollector -version
# && tsdbrelay -version doesn't work. See https://github.com/bosun-monitor/bosun/issues/1792
&& bosun -version \
&& scollector -version \
&& tsdbrelay -version
87 changes: 56 additions & 31 deletions cmd/tsdbrelay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@ import (
"compress/gzip"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"os"
"runtime"
"strings"

version "bosun.org/_version"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@captncraig I'm curious why I had to put an alias here. Locally using go 1.6.3 it can't find the reference to version package and gives an error/removes the import if I try to use a default import or even the _ import. Scollector/Bosun seem to work fine without the alias.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't explain why. If it helps to remove the alias, go ahead, but it sounds like something odd may be going on with your go environment.


"bosun.org/cmd/tsdbrelay/denormalize"
"bosun.org/collect"
"bosun.org/metadata"
"bosun.org/opentsdb"
"bosun.org/slog"
"bosun.org/util"
)

Expand All @@ -29,6 +31,7 @@ var (
tsdbServer = flag.String("t", "", "Target OpenTSDB server. Can specify port with host:port.")
logVerbose = flag.Bool("v", false, "enable verbose logging")
toDenormalize = flag.String("denormalize", "", "List of metrics to denormalize. Comma seperated list of `metric__tagname__tagname` rules. Will be translated to `__tagvalue.tagvalue.metric`")
flagVersion = flag.Bool("version", false, "Prints the version and exits.")

redisHost = flag.String("redis", "", "redis host for aggregating external counters")
redisDb = flag.Int("db", 0, "redis db to use for counters")
Expand All @@ -41,28 +44,34 @@ var (
denormalizationRules map[string]*denormalize.DenormalizationRule

relayPutUrls []string

tags = opentsdb.TagSet{}
)

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
var err error
myHost, err = os.Hostname()
if err != nil || myHost == "" {
myHost = "tsdbrelay"
}

flag.Parse()
if *flagVersion {
fmt.Println(version.GetVersionInfo("tsdbrelay"))
os.Exit(0)
}
if *bosunServer == "" || *tsdbServer == "" {
log.Fatal("must specify both bosun and tsdb server")
slog.Fatal("must specify both bosun and tsdb server")
}
log.Println("listen on", *listenAddr)
log.Println("relay to bosun at", *bosunServer)
log.Println("relay to tsdb at", *tsdbServer)
slog.Infoln(version.GetVersionInfo("tsdbrelay"))
slog.Infoln("listen on", *listenAddr)
slog.Infoln("relay to bosun at", *bosunServer)
slog.Infoln("relay to tsdb at", *tsdbServer)
if *toDenormalize != "" {
var err error
denormalizationRules, err = denormalize.ParseDenormalizationRules(*toDenormalize)
if err != nil {
log.Fatal(err)
slog.Fatal(err)
}
}

Expand Down Expand Up @@ -121,19 +130,30 @@ func main() {
Path: "/api/put",
}
if err = collect.Init(collectUrl, "tsdbrelay"); err != nil {
log.Fatal(err)
slog.Fatal(err)
}
log.Fatal(http.ListenAndServe(*listenAddr, nil))
}

func init() {
metadata.AddMetricMeta("tsdbrelay.puts.relayed", metadata.Counter, metadata.Count, "Number of successful puts relayed")
metadata.AddMetricMeta("tsdbrelay.metadata.relayed", metadata.Counter, metadata.Count, "Number of successful metadata puts relayed")
if err := metadata.Init(collectUrl, false); err != nil {
slog.Fatal(err)
}
// Make sure these get zeroed out instead of going unknown on restart
collect.Add("puts.relayed", tags, 0)
collect.Add("puts.error", tags, 0)
collect.Add("metadata.relayed", tags, 0)
collect.Add("metadata.error", tags, 0)
collect.Add("additional.puts.relayed", tags, 0)
collect.Add("additional.puts.error", tags, 0)
metadata.AddMetricMeta("tsdbrelay.puts.relayed", metadata.Counter, metadata.Count, "Number of successful puts relayed to opentsdb target")
metadata.AddMetricMeta("tsdbrelay.puts.error", metadata.Counter, metadata.Count, "Number of puts that could not be relayed to opentsdb target")
metadata.AddMetricMeta("tsdbrelay.metadata.relayed", metadata.Counter, metadata.Count, "Number of successful metadata puts relayed to bosun target")
metadata.AddMetricMeta("tsdbrelay.metadata.error", metadata.Counter, metadata.Count, "Number of metadata puts that could not be relayed to bosun target")
metadata.AddMetricMeta("tsdbrelay.additional.puts.relayed", metadata.Counter, metadata.Count, "Number of successful puts relayed to additional targets")
metadata.AddMetricMeta("tsdbrelay.additional.puts.error", metadata.Counter, metadata.Count, "Number of puts that could not be relayed to additional targets")
slog.Fatal(http.ListenAndServe(*listenAddr, nil))
}

func verbose(format string, a ...interface{}) {
if *logVerbose {
log.Printf(format, a...)
slog.Infof(format, a...)
}
}

Expand Down Expand Up @@ -175,17 +195,18 @@ func (rp *relayProxy) relayPut(responseWriter http.ResponseWriter, r *http.Reque
w := &relayWriter{ResponseWriter: responseWriter}
rp.TSDBProxy.ServeHTTP(w, r)
if w.code/100 != 2 {
verbose("got status %d", w.code)
verbose("relayPut got status %d", w.code)
collect.Add("puts.error", tags, 1)
return
}
verbose("relayed to tsdb")
collect.Add("puts.relayed", opentsdb.TagSet{}, 1)
collect.Add("puts.relayed", tags, 1)
// Send to bosun in a separate go routine so we can end the source's request.
go func() {
body := bytes.NewBuffer(reader.buf.Bytes())
req, err := http.NewRequest(r.Method, bosunIndexURL, body)
if err != nil {
verbose("%v", err)
verbose("bosun connect error: %v", err)
return
}
resp, err := http.DefaultClient.Do(req)
Expand All @@ -207,19 +228,22 @@ func (rp *relayProxy) relayPut(responseWriter http.ResponseWriter, r *http.Reque
body := bytes.NewBuffer(reader.buf.Bytes())
req, err := http.NewRequest(r.Method, relayUrl, body)
if err != nil {
verbose("%v", err)
return
verbose("relayPutUrls connect error: %v", err)
collect.Add("puts.additional.error", tags, 1)
continue
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Add(relayHeader, myHost)
resp, err := http.DefaultClient.Do(req)
if err != nil {
verbose("secondary relay error: %v", err)
return
collect.Add("puts.additional.error", tags, 1)
continue
}
resp.Body.Close()
verbose("secondary relay success")
collect.Add("puts.additional.relayed", tags, 1)
}
}()
}
Expand All @@ -244,7 +268,7 @@ func (rp *relayProxy) denormalize(body io.Reader) {
if err = rule.Translate(dp); err == nil {
relayDps = append(relayDps, dp)
} else {
verbose(err.Error())
verbose("error translating points: %v", err.Error())
}
}
}
Expand All @@ -265,7 +289,7 @@ func (rp *relayProxy) denormalize(body io.Reader) {
}
req, err := http.NewRequest("POST", tsdbPutURL, buf)
if err != nil {
verbose("%v", err)
verbose("error posting denormalized data points: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
Expand All @@ -283,11 +307,12 @@ func (rp *relayProxy) relayMetadata(responseWriter http.ResponseWriter, r *http.
w := &relayWriter{ResponseWriter: responseWriter}
rp.BosunProxy.ServeHTTP(w, r)
if w.code != 204 {
verbose("got status %d", w.code)
verbose("relayMetadata got status %d", w.code)
collect.Add("metadata.error", tags, 1)
return
}
verbose("relayed metadata to bosun")
collect.Add("metadata.relayed", opentsdb.TagSet{}, 1)
collect.Add("metadata.relayed", tags, 1)
if r.Header.Get(relayHeader) != "" {
return
}
Expand All @@ -298,18 +323,18 @@ func (rp *relayProxy) relayMetadata(responseWriter http.ResponseWriter, r *http.
body := bytes.NewBuffer(reader.buf.Bytes())
req, err := http.NewRequest(r.Method, relayUrl, body)
if err != nil {
verbose("%v", err)
return
verbose("metadata relayPutUrls error %v", err)
continue
}
req.Header.Set("Content-Type", "application/json")
req.Header.Add(relayHeader, myHost)
resp, err := http.DefaultClient.Do(req)
if err != nil {
verbose("secondary relay error: %v", err)
return
verbose("secondary relay metadata error: %v", err)
continue
}
resp.Body.Close()
verbose("secondary relay success")
verbose("secondary relay metadata success")
}
}()
}
Expand Down
4 changes: 4 additions & 0 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -198,6 +199,9 @@ func Init(u *url.URL, debug bool) error {
if err != nil {
return err
}
if strings.HasPrefix(mh.Host, ":") {
mh.Host = "localhost" + mh.Host
}
metahost = mh.String()
metadebug = debug
go collectMetadata()
Expand Down