Skip to content

Commit

Permalink
Add cache for grpc render
Browse files Browse the repository at this point in the history
  • Loading branch information
emadolsky committed Aug 31, 2022
1 parent 9f83eef commit a60bed3
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 68 deletions.
6 changes: 4 additions & 2 deletions carbonserver/fetchsinglemetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ func (r response) proto2() *protov2.FetchResponse {
StartTime: int32(r.StartTime),
StopTime: int32(r.StopTime),
StepTime: int32(r.StepTime),
Values: r.Values,
Values: make([]float64, len(r.Values)),
IsAbsent: make([]bool, len(r.Values)),
}

for i, p := range resp.Values {
for i, p := range r.Values {
if math.IsNaN(p) {
resp.Values[i] = 0
resp.IsAbsent[i] = true
} else {
resp.Values[i] = p
}
}

Expand Down
172 changes: 106 additions & 66 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,40 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req

}

func (listener *CarbonserverListener) getRenderCacheKeyAndSize(targets map[timeRange][]target, format string) (string, uint64) {
targetKeys := make([]string, 0, len(targets))
for tr, ts := range targets {
names := make([]string, 0, len(ts))
for _, t := range ts {
names = append(names, t.Name)
}
targetKeys = append(targetKeys, fmt.Sprintf("%s&%d&%d", strings.Join(names, "&"), tr.from, tr.until))
}
// TODO(gmagnusson): Our cache key changes if we permute any of the
// metric names while keeping the from/until pairs fixed, or permute
// any combination of metric names and from/until pairs as a unit.
// These permutations to not change the response, so we should be more
// clever about how we construct the cache key.
//
// An option is to sort the metric names within each from/until pair,
// and to order (names,from,until) triples by the from timestamp.
//
// This is probably worth doing, as the only source of differences in
// from/until pairs I know of comes from using timeShift or timeStack,
// which again probably comes from trying to compare trends from last
// week to what's going on now, which is something people are generally
// interested in. It wouldn't be good to start adding false negative
// cache misses to those kinds of queries.
key := fmt.Sprintf("%s&%s", strings.Join(targetKeys, "&"), format)
size := uint64(100 * 1024 * 1024)
renderRequests := atomic.LoadUint64(&listener.metrics.RenderRequests)
fetchSize := atomic.LoadUint64(&listener.metrics.FetchSize)
if renderRequests > 0 {
size = fetchSize / renderRequests
}
return key, size
}

func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, bool, error) {
logger = logger.With(
zap.String("function", "fetchWithCache"),
Expand All @@ -292,38 +326,7 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger

var response fetchResponse
if listener.queryCacheEnabled {
targetKeys := make([]string, 0, len(targets))
for tr, ts := range targets {
names := make([]string, 0, len(ts))
for _, t := range ts {
names = append(names, t.Name)
}
targetKeys = append(targetKeys, fmt.Sprintf("%s&%d&%d", strings.Join(names, "&"), tr.from, tr.until))
}
key := fmt.Sprintf("%s&%s", strings.Join(targetKeys, "&"), format)

size := uint64(100 * 1024 * 1024)
renderRequests := atomic.LoadUint64(&listener.metrics.RenderRequests)
fetchSize := atomic.LoadUint64(&listener.metrics.FetchSize)
if renderRequests > 0 {
size = fetchSize / renderRequests
}

// TODO(gmagnusson): Our cache key changes if we permute any of the
// metric names while keeping the from/until pairs fixed, or permute
// any combination of metric names and from/until pairs as a unit.
// These permutations to not change the response, so we should be more
// clever about how we construct the cache key.
//
// An option is to sort the metric names within each from/until pair,
// and to order (names,from,until) triples by the from timestamp.
//
// This is probably worth doing, as the only source of differences in
// from/until pairs I know of comes from using timeShift or timeStack,
// which again probably comes from trying to compare trends from last
// week to what's going on now, which is something people are generally
// interested in. It wouldn't be good to start adding false negative
// cache misses to those kinds of queries.
key, size := listener.getRenderCacheKeyAndSize(targets, format.String())
var res interface{}
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
Expand Down Expand Up @@ -587,6 +590,29 @@ func (listener *CarbonserverListener) fetchData(metric, pathExpression string, f
return multi, nil
}

func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_RenderServer, responseChan chan response, storeAndGetMetrics bool) (responses []response, metricsFetched, valuesFetched int, err error) {
var metricAccessBatch []string
for renderResponse := range responseChan {
err = stream.Send(renderResponse.proto2())
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
return
}
if storeAndGetMetrics {
responses = append(responses, renderResponse)
}
metricsFetched++
valuesFetched += len(renderResponse.Values)
if listener.internalStatsDir != "" {
metricAccessBatch = append(metricAccessBatch, renderResponse.Name)
}
}
if listener.internalStatsDir != "" {
listener.UpdateMetricsAccessTimesByRequest(metricAccessBatch)
}
return
}

// Render implements Render rpc of CarbonV2 gRPC service
func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, stream grpcv2.CarbonV2_RenderServer) (rpcErr error) {
t0 := time.Now()
Expand Down Expand Up @@ -641,48 +667,62 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
}
}()

// TODO: implementing cache?
fromCache := false

metricsFetched := 0
valuesFetched := 0
// TODO: should chan buffer size be configurable?
responseChan := make(chan response, 1000)
var fromCache bool
var err error

metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
if expandedGlobs == nil {
if err != nil {
return status.New(codes.InvalidArgument, err.Error()).Err()
fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
if expandedGlobs == nil {
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
var responses []response
responses, metricsFetched, valuesFetched, err = listener.streamMetrics(stream, responseChan, getMetrics)
return responses, err
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)

metricsFetched := 0
valuesFetched := 0
// TODO: configurable?
metricAccessBatchSize := 100
metricAccessBatch := make([]string, 0, metricAccessBatchSize)
for renderResponse := range responseChan {
err := stream.Send(renderResponse.proto2())
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
accessLogger.Error("fetch failed",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "stream send failed"),
zap.Error(err),
)
return err
}
metricsFetched++
valuesFetched += len(renderResponse.Values)
if listener.internalStatsDir != "" {
metricAccessBatch = append(metricAccessBatch, renderResponse.Name)
if len(metricAccessBatch) >= metricAccessBatchSize {
listener.UpdateMetricsAccessTimesByRequest(metricAccessBatch)
metricAccessBatch = metricAccessBatch[:0]
if listener.queryCacheEnabled {
key, size := listener.getRenderCacheKeyAndSize(targets, format.String()+"grpc")
var res interface{}
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
return fetchAndStreamMetricsFunc(true)
})
if err == nil {
listener.prometheus.cacheRequest("query", fromCache)
if fromCache {
atomic.AddUint64(&listener.metrics.QueryCacheHit, 1)
cacheResponse := res.([]response)
go func() {
for _, r := range cacheResponse {
responseChan <- r
}
close(responseChan)
}()
_, metricsFetched, valuesFetched, err = listener.streamMetrics(stream, responseChan, false)
} else {
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
}
}
} else {
_, err = fetchAndStreamMetricsFunc(false)
}
if err != nil {
accessLogger.Error("fetch failed",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "stream send failed"),
zap.Error(err),
)
return err
}

if listener.checkRequestCtx(ctx) != nil {
Expand Down

0 comments on commit a60bed3

Please sign in to comment.