Skip to content

Commit

Permalink
limiter: refactor for enter on multi-targets render
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Jan 31, 2023
1 parent 607c7b4 commit 855f892
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 66 deletions.
4 changes: 2 additions & 2 deletions autocomplete/autocomplete.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (h *Handler) ServeTags(w http.ResponseWriter, r *http.Request) {
}

if err != nil {
status = clickhouse.HandleError(w, err)
status, _ = clickhouse.HandleError(w, err)
return
}
readBytes = int64(len(body))
Expand Down Expand Up @@ -579,7 +579,7 @@ func (h *Handler) ServeValues(w http.ResponseWriter, r *http.Request) {
}

if err != nil {
status = clickhouse.HandleError(w, err)
status, _ = clickhouse.HandleError(w, err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion find/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if err != nil {
status = clickhouse.HandleError(w, err)
status, _ = clickhouse.HandleError(w, err)
return
}

Expand Down
9 changes: 8 additions & 1 deletion helper/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/lomik/graphite-clickhouse/helper/errs"
"github.com/lomik/graphite-clickhouse/limiter"
"github.com/lomik/graphite-clickhouse/pkg/scope"

"go.uber.org/zap"
Expand Down Expand Up @@ -64,14 +65,20 @@ func extractClickhouseError(e string) (int, string) {
return http.StatusInternalServerError, "Storage error"
}

func HandleError(w http.ResponseWriter, err error) (status int) {
func HandleError(w http.ResponseWriter, err error) (status int, queueFail bool) {
status = http.StatusOK
errStr := err.Error()
if err == ErrInvalidTimeRange {
status = http.StatusBadRequest
http.Error(w, errStr, status)
return
}
if err == limiter.ErrTimeout || err == limiter.ErrOverflow {
queueFail = true
status = http.StatusServiceUnavailable
http.Error(w, err.Error(), status)
return
}
if _, ok := err.(*ErrWithDescr); ok {
status, errStr = extractClickhouseError(errStr)
http.Error(w, errStr, status)
Expand Down
1 change: 0 additions & 1 deletion helper/clickhouse/external-data.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func (e *ExternalData) SetDebug(debugDir string, perm os.FileMode) {
e.debug = nil
}
e.debug = &extDataDebug{debugDir, perm}
return
}

// buildBody returns multiform body, content type header and error
Expand Down
1 change: 0 additions & 1 deletion helper/clickhouse/external-data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func TestBuildBody(t *testing.T) {
b += "--" + contentID + "--\r\n"
assert.Equal(t, b, body.String(), "built body and expected body don't match")
}
return
}

func TestDebugDump(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions limiter/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
)

var ErrTimeout = errors.New("timeout exceeded")
var ErrOverflow = errors.New("storage maximum read slot wait timeout")
var ErrConcurrency = errors.New("storage concurrent read slot wait timeout")
var ErrOverflow = errors.New("storage maximum queries exceeded")

type ServerLimiter interface {
Capacity() int
Expand Down
4 changes: 2 additions & 2 deletions limiter/wlimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (sl *WLimiter) Enter(ctx context.Context, s string) (err error) {
if sl.cL.enter(ctx, s) != nil {
sl.l.leave(ctx, s)
sl.m.WaitErrors.Add(1)
err = ErrConcurrency
err = ErrTimeout
}
}
return
Expand All @@ -66,7 +66,7 @@ func (sl *WLimiter) TryEnter(ctx context.Context, s string) (err error) {
if sl.cL.tryEnter(ctx, s) != nil {
sl.l.leave(ctx, s)
sl.m.WaitErrors.Add(1)
err = ErrConcurrency
err = ErrTimeout
}
}
return
Expand Down
32 changes: 28 additions & 4 deletions prometheus/querier_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package prometheus

import (
"context"
"time"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/finder"
"github.com/lomik/graphite-clickhouse/limiter"
"github.com/lomik/graphite-clickhouse/pkg/alias"
"github.com/lomik/graphite-clickhouse/render/data"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -17,12 +19,30 @@ import (
// override in unit tests for stable results
var timeNow = time.Now

func (q *Querier) lookup(from, until int64, labelsMatcher ...*labels.Matcher) (*alias.Map, error) {
func (q *Querier) lookup(from, until int64, qlimiter limiter.ServerLimiter, queueDuration *time.Duration, labelsMatcher ...*labels.Matcher) (*alias.Map, error) {
terms, err := makeTaggedFromPromQL(labelsMatcher)
if err != nil {
return nil, err
}
var stat finder.FinderStat
var (
stat finder.FinderStat
limitCtx context.Context
cancel context.CancelFunc
)
if qlimiter.Enabled() {
limitCtx, cancel = context.WithTimeout(q.ctx, q.config.ClickHouse.IndexTimeout)
defer cancel()
start := time.Now()
err = qlimiter.Enter(limitCtx, "render")
*queueDuration += time.Since(start)
if err != nil {
// status = http.StatusServiceUnavailable
// queueFail = true
// http.Error(w, err.Error(), status)
return nil, err
}
defer qlimiter.Leave(limitCtx, "render")
}
// TODO: implement use stat for Prometheus queries
fndResult, err := finder.FindTagged(q.config, q.ctx, terms, from, until, &stat)

Expand Down Expand Up @@ -67,8 +87,12 @@ func (q *Querier) timeRange(hints *storage.SelectHints) (int64, int64) {

// Select returns a set of series that matches the given label matchers.
func (q *Querier) Select(sortSeries bool, hints *storage.SelectHints, labelsMatcher ...*labels.Matcher) storage.SeriesSet {
var (
queueDuration time.Duration
)
from, until := q.timeRange(hints)
am, err := q.lookup(from, until, labelsMatcher...)
qlimiter := data.GetQueryLimiterFrom("", q.config, from, until)
am, err := q.lookup(from, until, qlimiter, &queueDuration, labelsMatcher...)
if err != nil {
return nil //, nil, err @TODO
}
Expand Down Expand Up @@ -96,7 +120,7 @@ func (q *Querier) Select(sortSeries bool, hints *storage.SelectHints, labelsMatc
MaxDataPoints: maxDataPoints,
}: data.NewTargets([]string{}, am),
}
reply, err := multiTarget.Fetch(q.ctx, q.config, config.ContextPrometheus)
reply, err := multiTarget.Fetch(q.ctx, q.config, config.ContextPrometheus, qlimiter, &queueDuration)
if err != nil {
return nil // , nil, err @TODO
}
Expand Down
46 changes: 42 additions & 4 deletions render/data/multi_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ func GetQueryLimiter(username string, cfg *config.Config, m *MultiTarget) limite
return cfg.ClickHouse.QueryParams[n].Limiter
}

func GetQueryLimiterFrom(username string, cfg *config.Config, from, until int64) limiter.ServerLimiter {
n := 0
if username != "" && len(cfg.ClickHouse.UserLimits) > 0 {
if u, ok := cfg.ClickHouse.UserLimits[username]; ok {
return u.Limiter
}
}

if len(cfg.ClickHouse.QueryParams) > 1 {
n = config.GetQueryParam(cfg.ClickHouse.QueryParams, time.Second*time.Duration(until-from))
}

return cfg.ClickHouse.QueryParams[n].Limiter
}

func GetQueryParam(username string, cfg *config.Config, m *MultiTarget) (*config.QueryParam, int) {
n := 0

Expand All @@ -120,9 +135,12 @@ func GetQueryParam(username string, cfg *config.Config, m *MultiTarget) (*config
}

// Fetch fetches the parsed ClickHouse data returns CHResponses
func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext string) (CHResponses, error) {
var lock sync.RWMutex
var wg sync.WaitGroup
func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext string, qlimiter limiter.ServerLimiter, queueDuration *time.Duration) (CHResponses, error) {
var (
lock sync.RWMutex
wg sync.WaitGroup
entered int
)
logger := scope.Logger(ctx)
setCarbonlinkClient(&cfg.Carbonlink)

Expand All @@ -135,7 +153,12 @@ func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext s
dataTimeout := getDataTimeout(cfg, m)

ctxTimeout, cancel := context.WithTimeout(ctx, dataTimeout)
defer cancel()
defer func() {
for i := 0; i < entered; i++ {
qlimiter.Leave(ctxTimeout, "render")
}
cancel()
}()

errors := make([]error, 0, len(*m))
query := newQuery(cfg, len(*m))
Expand All @@ -154,6 +177,21 @@ func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext s
logger.Error("data tables is not specified", zap.Error(err))
return EmptyResponse(), err
}
if qlimiter.Enabled() {
start := time.Now()
err = qlimiter.Enter(ctxTimeout, "render")
*queueDuration += time.Since(start)
if err != nil {
// status = http.StatusServiceUnavailable
// queueFail = true
// http.Error(w, err.Error(), status)
lock.Lock()
errors = append(errors, err)
lock.Unlock()
break
}
entered++
}
wg.Add(1)
go func(cond *conditions) {
defer wg.Done()
Expand Down
5 changes: 3 additions & 2 deletions render/data/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type Cache struct {
// Targets represents requested metrics
type Targets struct {
// List contains queried metrics, e.g. [metric.{name1,name2}, metric.name[3-9]]
List []string
Cache []Cache
List []string
Cache []Cache
Cached bool // all is cached
// AM stores found expanded metrics
AM *alias.Map
pointsTable string
Expand Down
Loading

0 comments on commit 855f892

Please sign in to comment.