Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple cache for grpc render #492

Merged
merged 1 commit into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions carbonserver/fetchsinglemetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ func (r response) proto2() *protov2.FetchResponse {
StartTime: int32(r.StartTime),
StopTime: int32(r.StopTime),
StepTime: int32(r.StepTime),
Values: r.Values,
Values: make([]float64, len(r.Values)),
IsAbsent: make([]bool, len(r.Values)),
}

for i, p := range resp.Values {
for i, p := range r.Values {
if math.IsNaN(p) {
resp.Values[i] = 0
resp.IsAbsent[i] = true
} else {
resp.Values[i] = p
}
}

Expand Down
172 changes: 106 additions & 66 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,40 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req

}

func (listener *CarbonserverListener) getRenderCacheKeyAndSize(targets map[timeRange][]target, format string) (string, uint64) {
targetKeys := make([]string, 0, len(targets))
for tr, ts := range targets {
names := make([]string, 0, len(ts))
for _, t := range ts {
names = append(names, t.Name)
}
targetKeys = append(targetKeys, fmt.Sprintf("%s&%d&%d", strings.Join(names, "&"), tr.from, tr.until))
}
// TODO(gmagnusson): Our cache key changes if we permute any of the
// metric names while keeping the from/until pairs fixed, or permute
// any combination of metric names and from/until pairs as a unit.
// These permutations to not change the response, so we should be more
// clever about how we construct the cache key.
//
// An option is to sort the metric names within each from/until pair,
// and to order (names,from,until) triples by the from timestamp.
//
// This is probably worth doing, as the only source of differences in
// from/until pairs I know of comes from using timeShift or timeStack,
// which again probably comes from trying to compare trends from last
// week to what's going on now, which is something people are generally
// interested in. It wouldn't be good to start adding false negative
// cache misses to those kinds of queries.
key := fmt.Sprintf("%s&%s", strings.Join(targetKeys, "&"), format)
size := uint64(100 * 1024 * 1024)
renderRequests := atomic.LoadUint64(&listener.metrics.RenderRequests)
fetchSize := atomic.LoadUint64(&listener.metrics.FetchSize)
if renderRequests > 0 {
size = fetchSize / renderRequests
}
return key, size
}

func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, bool, error) {
logger = logger.With(
zap.String("function", "fetchWithCache"),
Expand All @@ -292,38 +326,7 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger

var response fetchResponse
if listener.queryCacheEnabled {
targetKeys := make([]string, 0, len(targets))
for tr, ts := range targets {
names := make([]string, 0, len(ts))
for _, t := range ts {
names = append(names, t.Name)
}
targetKeys = append(targetKeys, fmt.Sprintf("%s&%d&%d", strings.Join(names, "&"), tr.from, tr.until))
}
key := fmt.Sprintf("%s&%s", strings.Join(targetKeys, "&"), format)

size := uint64(100 * 1024 * 1024)
renderRequests := atomic.LoadUint64(&listener.metrics.RenderRequests)
fetchSize := atomic.LoadUint64(&listener.metrics.FetchSize)
if renderRequests > 0 {
size = fetchSize / renderRequests
}

// TODO(gmagnusson): Our cache key changes if we permute any of the
// metric names while keeping the from/until pairs fixed, or permute
// any combination of metric names and from/until pairs as a unit.
// These permutations to not change the response, so we should be more
// clever about how we construct the cache key.
//
// An option is to sort the metric names within each from/until pair,
// and to order (names,from,until) triples by the from timestamp.
//
// This is probably worth doing, as the only source of differences in
// from/until pairs I know of comes from using timeShift or timeStack,
// which again probably comes from trying to compare trends from last
// week to what's going on now, which is something people are generally
// interested in. It wouldn't be good to start adding false negative
// cache misses to those kinds of queries.
key, size := listener.getRenderCacheKeyAndSize(targets, format.String())
var res interface{}
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
Expand Down Expand Up @@ -587,6 +590,28 @@ func (listener *CarbonserverListener) fetchData(metric, pathExpression string, f
return multi, nil
}

func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_RenderServer, responseChan chan response, storeAndGetMetrics bool) (responses []response, metricsFetched, valuesFetched int, err error) {
var metricAccessBatch []string
for renderResponse := range responseChan {
err = stream.Send(renderResponse.proto2())
if err != nil {
return
}
if storeAndGetMetrics {
responses = append(responses, renderResponse)
}
metricsFetched++
valuesFetched += len(renderResponse.Values)
if listener.internalStatsDir != "" {
metricAccessBatch = append(metricAccessBatch, renderResponse.Name)
}
}
if listener.internalStatsDir != "" {
listener.UpdateMetricsAccessTimesByRequest(metricAccessBatch)
}
return
}

// Render implements Render rpc of CarbonV2 gRPC service
func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, stream grpcv2.CarbonV2_RenderServer) (rpcErr error) {
t0 := time.Now()
Expand Down Expand Up @@ -641,48 +666,63 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
}
}()

// TODO: implementing cache?
fromCache := false

metricsFetched := 0
valuesFetched := 0
// TODO: should chan buffer size be configurable?
responseChan := make(chan response, 1000)
var fromCache bool
var err error

metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
if expandedGlobs == nil {
if err != nil {
return status.New(codes.InvalidArgument, err.Error()).Err()
fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
if expandedGlobs == nil {
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
var responses []response
responses, metricsFetched, valuesFetched, err = listener.streamMetrics(stream, responseChan, getMetrics)
return responses, err
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)

metricsFetched := 0
valuesFetched := 0
// TODO: configurable?
metricAccessBatchSize := 100
metricAccessBatch := make([]string, 0, metricAccessBatchSize)
for renderResponse := range responseChan {
err := stream.Send(renderResponse.proto2())
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
accessLogger.Error("fetch failed",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "stream send failed"),
zap.Error(err),
)
return err
}
metricsFetched++
valuesFetched += len(renderResponse.Values)
if listener.internalStatsDir != "" {
metricAccessBatch = append(metricAccessBatch, renderResponse.Name)
if len(metricAccessBatch) >= metricAccessBatchSize {
listener.UpdateMetricsAccessTimesByRequest(metricAccessBatch)
metricAccessBatch = metricAccessBatch[:0]
if listener.queryCacheEnabled {
key, size := listener.getRenderCacheKeyAndSize(targets, format.String()+"grpc")
var res interface{}
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
return fetchAndStreamMetricsFunc(true)
})
if err == nil {
listener.prometheus.cacheRequest("query", fromCache)
if fromCache {
atomic.AddUint64(&listener.metrics.QueryCacheHit, 1)
cacheResponse := res.([]response)
go func() {
for _, r := range cacheResponse {
responseChan <- r
}
close(responseChan)
}()
_, metricsFetched, valuesFetched, err = listener.streamMetrics(stream, responseChan, false)
} else {
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
}
}
} else {
_, err = fetchAndStreamMetricsFunc(false)
}
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
accessLogger.Error("fetch failed",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "stream send failed"),
zap.Error(err),
)
return err
}

if listener.checkRequestCtx(ctx) != nil {
Expand Down