Skip to content

Commit

Permalink
Merge pull request #222 from msaf1980/logger
Browse files Browse the repository at this point in the history
Do active healthcheck for /alive
  • Loading branch information
msaf1980 authored Jan 23, 2023
2 parents f29b4c9 + ea745b9 commit 03535f6
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 51 deletions.
95 changes: 82 additions & 13 deletions autocomplete/autocomplete.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,29 @@ func (h *Handler) requestExpr(r *http.Request) (*where.Where, *where.Where, map[
return wr, pw, usedTags, nil
}

func taggedKey(typ string, truncateSec int32, fromDate, untilDate string, tag string, exprs []string, prefix string, limit int) (string, string) {
func taggedKey(typ string, truncateSec int32, fromDate, untilDate string, tag string, exprs []string, tagPrefix string, limit int) (string, string) {
ts := utils.TimestampTruncate(timeNow().Unix(), time.Duration(truncateSec)*time.Second)
var sb stringutils.Builder
sb.Grow(128)
sb.WriteString(typ)
sb.WriteString(fromDate)
sb.WriteString(";")
sb.WriteByte(';')
sb.WriteString(untilDate)
sb.WriteString(";limit=")
sb.WriteInt(int64(limit), 10)
sb.WriteString(";")
tagStart := sb.Len()
sb.WriteString(prefix)
sb.WriteString(";tag=")
sb.WriteString(tag)
if tagPrefix != "" {
sb.WriteString(";tagPrefix=")
sb.WriteString(tagPrefix)
}
if tag != "" {
sb.WriteString(";tag=")
sb.WriteString(tag)
}
for _, expr := range exprs {
sb.WriteString(";")
sb.WriteString(";expr='")
sb.WriteString(strings.Replace(expr, " = ", "=", 1))
sb.WriteByte('\'')
}
exprEnd := sb.Len()
sb.WriteString(";ts=")
Expand All @@ -132,11 +137,56 @@ func taggedKey(typ string, truncateSec int32, fromDate, untilDate string, tag st
return s, s[tagStart:exprEnd]
}

func taggedValuesKey(typ string, truncateSec int32, fromDate, untilDate string, tag string, exprs []string, valuePrefix string, limit int) (string, string) {
ts := utils.TimestampTruncate(timeNow().Unix(), time.Duration(truncateSec)*time.Second)
var sb stringutils.Builder
sb.Grow(128)
sb.WriteString(typ)
sb.WriteString(fromDate)
sb.WriteByte(';')
sb.WriteString(untilDate)
sb.WriteString(";limit=")
sb.WriteInt(int64(limit), 10)
tagStart := sb.Len()
if valuePrefix != "" {
sb.WriteString(";valuePrefix=")
sb.WriteString(valuePrefix)
}
if tag != "" {
sb.WriteString(";tag=")
sb.WriteString(tag)
}
for _, expr := range exprs {
sb.WriteString(";expr='")
sb.WriteString(strings.Replace(expr, " = ", "=", 1))
sb.WriteByte('\'')
}
exprEnd := sb.Len()
sb.WriteString(";ts=")
sb.WriteString(strconv.FormatInt(ts, 10))

s := sb.String()
return s, s[tagStart:exprEnd]
}

// func taggedTagsQuery(exprs []string, tagPrefix string, limit int) []string {
// query := make([]string, 0, 3+len(exprs))
// if tagPrefix != "" {
// query = append(query, "tagPrefix="+tagPrefix)
// }
// for _, expr := range exprs {
// query = append(query, "expr='"+expr+"'")
// }
// query = append(query, "limit="+strconv.Itoa(limit))
// return query
// }

func (h *Handler) ServeTags(w http.ResponseWriter, r *http.Request) {
start := timeNow()
status := http.StatusOK
accessLogger := scope.LoggerWithHeaders(r.Context(), r, h.config.Common.HeadersToLog)
logger := accessLogger.Named("autocomplete")
accessLogger := scope.LoggerWithHeaders(r.Context(), r, h.config.Common.HeadersToLog).Named("http")
logger := scope.LoggerWithHeaders(r.Context(), r, h.config.Common.HeadersToLog).Named("autocomplete")
r = r.WithContext(scope.WithLogger(r.Context(), logger))

var (
err error
Expand Down Expand Up @@ -193,11 +243,12 @@ func (h *Handler) ServeTags(w http.ResponseWriter, r *http.Request) {
fromDate, untilDate := dateString(h.config.ClickHouse.TaggedAutocompleDays, start)

var key string
exprs := r.Form["expr"]
// params := taggedTagsQuery(exprs, tagPrefix, limit)

useCache := h.config.Common.FindCache != nil && h.config.Common.FindCacheConfig.FindTimeoutSec > 0 && !parser.TruthyBool(r.FormValue("noCache"))
if useCache {
exprs := r.Form["expr"]
key, _ = taggedKey("tags;", h.config.Common.FindCacheConfig.FindTimeoutSec, fromDate, untilDate, "", exprs, "tagPrefix="+tagPrefix, limit)
key, _ = taggedKey("tags;", h.config.Common.FindCacheConfig.FindTimeoutSec, fromDate, untilDate, "", exprs, tagPrefix, limit)
body, err = h.config.Common.FindCache.Get(key)
if err == nil {
if metrics.FinderCacheMetrics != nil {
Expand Down Expand Up @@ -333,7 +384,6 @@ func (h *Handler) ServeTags(w http.ResponseWriter, r *http.Request) {
if len(tags) > limit {
tags = tags[:limit]
}

if useCache {
if findCache {
logger.Info("finder", zap.String("get_cache", key),
Expand All @@ -358,11 +408,27 @@ func (h *Handler) ServeTags(w http.ResponseWriter, r *http.Request) {
w.Write(b)
}

// func taggedValuesQuery(tag string, exprs []string, valuePrefix string, limit int) []string {
// query := make([]string, 0, 3+len(exprs))
// if tag != "" {
// query = append(query, "tag="+tag)
// }
// if valuePrefix != "" {
// query = append(query, "valuePrefix="+valuePrefix)
// }
// for _, expr := range exprs {
// query = append(query, "expr='"+expr+"'")
// }
// query = append(query, "limit="+strconv.Itoa(limit))
// return query
// }

func (h *Handler) ServeValues(w http.ResponseWriter, r *http.Request) {
start := timeNow()
status := http.StatusOK
accessLogger := scope.LoggerWithHeaders(r.Context(), r, h.config.Common.HeadersToLog).Named("http")
logger := scope.LoggerWithHeaders(r.Context(), r, h.config.Common.HeadersToLog).Named("autocomplete")
r = r.WithContext(scope.WithLogger(r.Context(), logger))

var (
err error
Expand Down Expand Up @@ -422,11 +488,14 @@ func (h *Handler) ServeValues(w http.ResponseWriter, r *http.Request) {
fromDate, untilDate := dateString(h.config.ClickHouse.TaggedAutocompleDays, start)

var key string
exprs := r.Form["expr"]
// params := taggedValuesQuery(tag, exprs, valuePrefix, limit)

// taggedKey(tag, , "valuePrefix="+valuePrefix, limit)
useCache := h.config.Common.FindCache != nil && h.config.Common.FindCacheConfig.FindTimeoutSec > 0 && !parser.TruthyBool(r.FormValue("noCache"))
if useCache {
// logger = logger.With(zap.String("use_cache", "true"))
key, _ = taggedKey("values;", h.config.Common.FindCacheConfig.FindTimeoutSec, fromDate, untilDate, tag, r.Form["expr"], "valuePrefix="+valuePrefix, limit)
key, _ = taggedValuesKey("values;", h.config.Common.FindCacheConfig.FindTimeoutSec, fromDate, untilDate, tag, exprs, valuePrefix, limit)
body, err = h.config.Common.FindCache.Get(key)
if err == nil {
if metrics.FinderCacheMetrics != nil {
Expand Down
4 changes: 2 additions & 2 deletions find/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
queueFail bool
queueDuration time.Duration
findCache bool
query string
)

username := w.Header().Get("X-Forwarded-User")
Expand Down Expand Up @@ -74,8 +75,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

r.ParseMultipartForm(1024 * 1024)

var query string

format := r.FormValue("format")
if format == "carbonapi_v3_pb" {
body, err := io.ReadAll(r.Body)
Expand Down Expand Up @@ -122,6 +121,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

var key string
// params := []string{query}
useCache := h.config.Common.FindCache != nil && h.config.Common.FindCacheConfig.FindTimeoutSec > 0 && !parser.TruthyBool(r.FormValue("noCache"))
if useCache {
ts := utils.TimestampTruncate(time.Now().Unix(), time.Duration(h.config.Common.FindCacheConfig.FindTimeoutSec)*time.Second)
Expand Down
6 changes: 2 additions & 4 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lomik/graphite-clickhouse/capabilities"
"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/find"
"github.com/lomik/graphite-clickhouse/healthcheck"
"github.com/lomik/graphite-clickhouse/index"
"github.com/lomik/graphite-clickhouse/logs"
"github.com/lomik/graphite-clickhouse/metrics"
Expand Down Expand Up @@ -180,6 +181,7 @@ func main() {
mux.Handle("/render/", app.Handler(render.NewHandler(cfg)))
mux.Handle("/tags/autoComplete/tags", app.Handler(autocomplete.NewTags(cfg)))
mux.Handle("/tags/autoComplete/values", app.Handler(autocomplete.NewValues(cfg)))
mux.Handle("/alive", app.Handler(healthcheck.NewHandler(cfg)))
mux.HandleFunc("/debug/config", func(w http.ResponseWriter, r *http.Request) {

status := http.StatusOK
Expand All @@ -200,10 +202,6 @@ func main() {
}
w.Write(b)
})
mux.HandleFunc("/alive", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Graphite-clickhouse is alive.\n")
})

if cfg.Prometheus.Listen != "" {
if err := prometheus.Run(cfg); err != nil {
Expand Down
107 changes: 107 additions & 0 deletions healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package healthcheck

import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/msaf1980/go-stringutils"
"go.uber.org/zap"
)

// Handler serves /render requests
type Handler struct {
config *config.Config
last int64
failed int32
}

// NewHandler generates new *Handler
func NewHandler(config *config.Config) *Handler {
h := &Handler{
config: config,
failed: 1,
}

return h
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
query string
failed int32
)
if h.config.ClickHouse.IndexTable != "" {
// non-existing name with wrong level
query = "SELECT Path FROM " + h.config.ClickHouse.IndexTable + " WHERE ((Level=20002) AND (Path IN ('NonExistient','NonExistient.'))) AND (Date='1970-02-12') GROUP BY Path FORMAT TabSeparatedRaw"
} else if h.config.ClickHouse.TaggedTable != "" {
// non-existing partition
query = "SELECT Path FROM " + h.config.ClickHouse.TaggedTable + " WHERE (Tag1='__name__=NonExistient') AND (Date='1970-02-12') GROUP BY Path FORMAT TabSeparatedRaw"
}
if query != "" {
failed = 1
now := time.Now().Unix()
for {
last := atomic.LoadInt64(&h.last)
if now-last < 10 {
failed = atomic.LoadInt32(&h.failed)
break
}
// one query in 10 seconds for prevent overloading
if !atomic.CompareAndSwapInt64(&h.last, last, now) {
continue
}

logger := scope.LoggerWithHeaders(r.Context(), r, h.config.Common.HeadersToLog).Named("healthcheck")

client := http.Client{
Timeout: 2 * time.Second,
}
var u string
if pos := strings.Index(h.config.ClickHouse.URL, "/?"); pos > 0 {
u = h.config.ClickHouse.URL[:pos+2] + "query=" + url.QueryEscape(query)
} else {
u = h.config.ClickHouse.URL + "/?query=" + url.QueryEscape(query)
}

req, _ := http.NewRequest("GET", u, nil)
resp, err := client.Do(req)
if err != nil {
logger.Error("healthcheck error",
zap.Error(err),
)
}

defer resp.Body.Close()
if body, err := io.ReadAll(resp.Body); err == nil {
if resp.StatusCode == http.StatusOK {
failed = 0
} else {
failed = 1
logger.Error("healthcheck error",
zap.String("error", stringutils.UnsafeString(body)),
)
}
} else {
failed = 1
logger.Error("healthcheck error",
zap.Error(err),
)
}
atomic.StoreInt32(&h.failed, failed)
break
}
}
if failed > 0 {
http.Error(w, "Storage healthcheck failed", http.StatusServiceUnavailable)
} else {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Graphite-clickhouse is alive.\n")
}
}
Loading

0 comments on commit 03535f6

Please sign in to comment.