Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find grpc #488

Merged
merged 4 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 118 additions & 64 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,13 +1666,7 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
return func(wr http.ResponseWriter, req *http.Request) {
ratelimiter := listener.getPathRateLimiter(req.URL.Path)
// Can't use http.TimeoutHandler here due to supporting per-path timeout
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
newTimeout := listener.getPathRateLimiterTimeout(ratelimiter)
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), newTimeout)
defer cancel()
Expand Down Expand Up @@ -2058,91 +2052,151 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error {
opts = append(opts, grpc.ChainStreamInterceptor(
grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC),
grpcutil.StreamServerStatusMetricHandler(statusCodes, listener.prometheus.request),
listener.StreamServerRatelimitHandler(),
))
listener.StreamServerRatelimitHandler()), grpc.ChainUnaryInterceptor(
grpcutil.UnaryServerTimeHandler(listener.bucketRequestTimesGRPC),
grpcutil.UnaryServerStatusMetricHandler(statusCodes, listener.prometheus.request),
listener.UnaryServerRatelimitHandler()))
grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902
grpcv2.RegisterCarbonV2Server(grpcServer, listener)
go grpcServer.Serve(listener.grpcListener)
return nil
}

func (listener *CarbonserverListener) getPathRateLimiterTimeout(ratelimiter *ApiPerPathRatelimiter) time.Duration {
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
return newTimeout
}

func (listener *CarbonserverListener) UnaryServerRatelimitHandler() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
t0 := time.Now()
var payload string
if reqStringer, ok := req.(fmt.Stringer); ok {
payload = reqStringer.String()
}
fullMethodName := info.FullMethod
ratelimiter := listener.getPathRateLimiter(fullMethodName) // Can't use http.TimeoutHandler here due to supporting per-path timeout
newTimeout := listener.getPathRateLimiterTimeout(ratelimiter)
if newTimeout > 0 {
newCtx, cancel := context.WithTimeout(ctx, newTimeout)
defer cancel()
ctx = newCtx
}

if err := listener.grpcServerRatelimitHandler(ctx, ratelimiter, payload, t0); err != nil {
return nil, err
}
return handler(ctx, req)
}
}

func (listener *CarbonserverListener) StreamServerRatelimitHandler() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
t0 := time.Now()
fullMethodName := info.FullMethod
wss := grpcutil.GetWrappedStream(ss)
ratelimiter := listener.getPathRateLimiter(fullMethodName) // Can't use http.TimeoutHandler here due to supporting per-path timeout
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
newTimeout := listener.getPathRateLimiterTimeout(ratelimiter)
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(wss.Context(), newTimeout)
defer cancel()
wss.SetContext(ctx)
}

ctx := wss.Context()
var reqPeer string
if p, ok := peer.FromContext(ctx); ok {
reqPeer = p.Addr.String()
if err := listener.grpcServerRatelimitHandler(ctx, ratelimiter, wss.Payload(), t0); err != nil {
return err
}
accessLogger := TraceContextToZap(ctx, listener.accessLogger.With(
zap.String("handler", "rate_limit"),
zap.String("payload", wss.Payload()),
zap.String("peer", reqPeer),
))
return handler(srv, ss)
}
}

if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
zap.Int("grpc_code", int(codes.Unavailable)),
)
return status.Error(codes.Unavailable, "Service unavailable (index not ready)")
}
func (listener *CarbonserverListener) grpcServerRatelimitHandler(ctx context.Context, ratelimiter *ApiPerPathRatelimiter, payload string, t0 time.Time) error {
var reqPeer string
if p, ok := peer.FromContext(ctx); ok {
reqPeer = p.Addr.String()
}
accessLogger := TraceContextToZap(ctx, listener.accessLogger.With(
zap.String("handler", "rate_limit"),
zap.String("payload", payload),
zap.String("peer", reqPeer),
))

if ratelimiter != nil {
if ratelimiter.Enter() != nil {
accessLogger.Error("request blocked",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "blocked by api per path rate limiter"),
zap.Int("grpc_code", int(codes.InvalidArgument)),
)
return status.Error(codes.InvalidArgument, "blocked by api per path rate limiter")
}
defer ratelimiter.Exit()
if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
zap.Int("grpc_code", int(codes.Unavailable)),
)
return status.Error(codes.Unavailable, "Service unavailable (index not ready)")
}

// why: if the request is already timeout, there is no
// need to resume execution.
if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("grpc_code", int(codes.ResourceExhausted)),
)
return status.Error(codes.ResourceExhausted, "timeout due to maxInflightRequests")
}
if ratelimiter != nil {
if ratelimiter.Enter() != nil {
accessLogger.Error("request blocked",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "blocked by api per path rate limiter"),
zap.Int("grpc_code", int(codes.InvalidArgument)),
)
return status.Error(codes.InvalidArgument, "blocked by api per path rate limiter")
}
defer ratelimiter.Exit()

// TODO: to deprecate as it's replaced by per-path rate limiting?
//
// rate limit inflight requests
inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1)
defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0))
if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests {
atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1)
accessLogger.Error("request denied",
// why: if the request is already timeout, there is no
// need to resume execution.
if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "too many requests"),
zap.Int("grpc_code", http.StatusTooManyRequests),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("grpc_code", int(codes.ResourceExhausted)),
)
return status.Error(codes.ResourceExhausted, "too many requests")
return status.Error(codes.ResourceExhausted, "timeout due to maxInflightRequests")
}
}

return handler(srv, ss)
// TODO: to deprecate as it's replaced by per-path rate limiting?
//
// rate limit inflight requests
inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1)
defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0))
if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests {
atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1)
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "too many requests"),
zap.Int("grpc_code", http.StatusTooManyRequests),
)
return status.Error(codes.ResourceExhausted, "too many requests")
}
return nil
}

func getWithCache(logger *zap.Logger, cache queryCache, key string, size uint64, expire int32, f func() (interface{}, error)) (result interface{}, fromCache bool, err error) {
item := cache.getQueryItem(key, size, expire)
res, ok := item.FetchOrLock()
switch {
case !ok:
logger.Debug("cache miss")
result, err = f()
if err != nil {
item.StoreAbort()
} else {
item.StoreAndUnlock(result)
}
case res != nil:
logger.Debug("cache hit")
result = res
fromCache = true
default:
// Whenever there are multiple requests approaching for the same records,
// and the proceeding request gets an error, waiting requests should get an error too.
err = fmt.Errorf("invalid cache record for the request")
}
return
}
Loading